use std::{cell::OnceCell, path::Path, sync::Arc};
use interrupt_support::{SqlInterruptHandle, SqlInterruptScope};
use parking_lot::{Mutex, MutexGuard};
use remote_settings::RemoteSettingsResponse;
use rusqlite::{
named_params,
types::{FromSql, ToSql},
Connection, OpenFlags, OptionalExtension,
};
use sql_support::{open_database::open_database_with_flags, repeat_sql_vars, ConnExt};
use crate::{
config::{SuggestGlobalConfig, SuggestProviderConfig},
error::RusqliteResultExt,
fakespot,
geoname::GeonameCache,
pocket::{split_keyword, KeywordConfidence},
provider::SuggestionProvider,
rs::{
DownloadedAmoSuggestion, DownloadedAmpSuggestion, DownloadedAmpWikipediaSuggestion,
DownloadedExposureSuggestion, DownloadedFakespotSuggestion, DownloadedMdnSuggestion,
DownloadedPocketSuggestion, DownloadedWikipediaSuggestion, Record, SuggestRecordId,
},
schema::{clear_database, SuggestConnectionInitializer},
suggestion::{cook_raw_suggestion_url, AmpSuggestionType, Suggestion},
util::full_keyword,
weather::WeatherCache,
Result, SuggestionQuery,
};
pub const GLOBAL_CONFIG_META_KEY: &str = "global_config";
pub const PROVIDER_CONFIG_META_KEY_PREFIX: &str = "provider_config_";
pub const DEFAULT_SUGGESTION_SCORE: f64 = 0.2;
#[derive(Clone, Copy)]
pub(crate) enum ConnectionType {
ReadOnly,
ReadWrite,
}
impl From<ConnectionType> for OpenFlags {
fn from(type_: ConnectionType) -> Self {
match type_ {
ConnectionType::ReadOnly => {
OpenFlags::SQLITE_OPEN_URI
| OpenFlags::SQLITE_OPEN_NO_MUTEX
| OpenFlags::SQLITE_OPEN_READ_ONLY
}
ConnectionType::ReadWrite => {
OpenFlags::SQLITE_OPEN_URI
| OpenFlags::SQLITE_OPEN_NO_MUTEX
| OpenFlags::SQLITE_OPEN_CREATE
| OpenFlags::SQLITE_OPEN_READ_WRITE
}
}
}
}
#[derive(Default, Clone)]
pub struct Sqlite3Extension {
pub library: String,
pub entry_point: Option<String>,
}
pub(crate) struct SuggestDb {
pub conn: Mutex<Connection>,
pub interrupt_handle: Arc<SqlInterruptHandle>,
}
impl SuggestDb {
pub fn open(
path: impl AsRef<Path>,
extensions_to_load: &[Sqlite3Extension],
type_: ConnectionType,
) -> Result<Self> {
let conn = open_database_with_flags(
path,
type_.into(),
&SuggestConnectionInitializer::new(extensions_to_load),
)?;
Ok(Self::with_connection(conn))
}
fn with_connection(conn: Connection) -> Self {
let interrupt_handle = Arc::new(SqlInterruptHandle::new(&conn));
Self {
conn: Mutex::new(conn),
interrupt_handle,
}
}
pub fn read<T>(&self, op: impl FnOnce(&SuggestDao) -> Result<T>) -> Result<T> {
let conn = self.conn.lock();
let scope = self.interrupt_handle.begin_interrupt_scope()?;
let dao = SuggestDao::new(&conn, &scope);
op(&dao)
}
pub fn write<T>(&self, op: impl FnOnce(&mut SuggestDao) -> Result<T>) -> Result<T> {
let mut conn = self.conn.lock();
let scope = self.interrupt_handle.begin_interrupt_scope()?;
let tx = conn.transaction()?;
let mut dao = SuggestDao::new(&tx, &scope);
let result = op(&mut dao)?;
tx.commit()?;
Ok(result)
}
pub fn write_scope(&self) -> Result<WriteScope> {
Ok(WriteScope {
conn: self.conn.lock(),
scope: self.interrupt_handle.begin_interrupt_scope()?,
})
}
}
pub(crate) struct WriteScope<'a> {
pub conn: MutexGuard<'a, Connection>,
pub scope: SqlInterruptScope,
}
impl WriteScope<'_> {
pub fn write<T>(&mut self, op: impl FnOnce(&mut SuggestDao) -> Result<T>) -> Result<T> {
let tx = self.conn.transaction()?;
let mut dao = SuggestDao::new(&tx, &self.scope);
let result = op(&mut dao)?;
tx.commit()?;
Ok(result)
}
pub fn read<T>(&mut self, op: impl FnOnce(&SuggestDao) -> Result<T>) -> Result<T> {
let tx = self.conn.transaction()?;
let dao = SuggestDao::new(&tx, &self.scope);
let result = op(&dao)?;
tx.commit()?;
Ok(result)
}
pub fn err_if_interrupted(&self) -> Result<()> {
Ok(self.scope.err_if_interrupted()?)
}
}
pub(crate) struct SuggestDao<'a> {
pub conn: &'a Connection,
pub scope: &'a SqlInterruptScope,
pub weather_cache: OnceCell<WeatherCache>,
pub geoname_cache: OnceCell<GeonameCache>,
}
impl<'a> SuggestDao<'a> {
fn new(conn: &'a Connection, scope: &'a SqlInterruptScope) -> Self {
Self {
conn,
scope,
weather_cache: std::cell::OnceCell::new(),
geoname_cache: std::cell::OnceCell::new(),
}
}
pub fn delete_record_data(&mut self, record_id: &SuggestRecordId) -> Result<()> {
match record_id.as_icon_id() {
Some(icon_id) => self.drop_icon(icon_id)?,
None => self.drop_suggestions(record_id)?,
};
Ok(())
}
pub fn read_cached_rs_data(&self, collection: &str) -> Option<RemoteSettingsResponse> {
match self.try_read_cached_rs_data(collection) {
Ok(result) => result,
Err(e) => {
error_support::report_error!("suggest-rs-cache-read", "{e}");
None
}
}
}
pub fn write_cached_rs_data(&mut self, collection: &str, data: &RemoteSettingsResponse) {
if let Err(e) = self.try_write_cached_rs_data(collection, data) {
error_support::report_error!("suggest-rs-cache-write", "{e}");
}
}
fn try_read_cached_rs_data(&self, collection: &str) -> Result<Option<RemoteSettingsResponse>> {
let mut stmt = self
.conn
.prepare_cached("SELECT data FROM rs_cache WHERE collection = ?")?;
let data = stmt
.query_row((collection,), |row| row.get::<_, Vec<u8>>(0))
.optional()?;
match data {
Some(data) => Ok(Some(rmp_serde::decode::from_slice(data.as_slice())?)),
None => Ok(None),
}
}
fn try_write_cached_rs_data(
&mut self,
collection: &str,
data: &RemoteSettingsResponse,
) -> Result<()> {
let mut stmt = self
.conn
.prepare_cached("INSERT OR REPLACE INTO rs_cache(collection, data) VALUES(?, ?)")?;
stmt.execute((collection, rmp_serde::encode::to_vec(data)?))?;
Ok(())
}
pub fn get_ingested_records(&self) -> Result<Vec<IngestedRecord>> {
let mut stmt = self
.conn
.prepare_cached("SELECT id, collection, type, last_modified FROM ingested_records")?;
let rows = stmt.query_and_then((), IngestedRecord::from_row)?;
rows.collect()
}
pub fn update_ingested_records(
&mut self,
collection: &str,
new_records: &[&Record],
updated_records: &[&Record],
deleted_records: &[&IngestedRecord],
) -> Result<()> {
let mut delete_stmt = self
.conn
.prepare_cached("DELETE FROM ingested_records WHERE collection = ? AND id = ?")?;
for deleted in deleted_records {
delete_stmt.execute((collection, deleted.id.as_str()))?;
}
let mut insert_stmt = self.conn.prepare_cached(
"INSERT OR REPLACE INTO ingested_records(id, collection, type, last_modified) VALUES(?, ?, ?, ?)",
)?;
for record in new_records.iter().chain(updated_records) {
insert_stmt.execute((
record.id.as_str(),
collection,
record.record_type().as_str(),
record.last_modified,
))?;
}
Ok(())
}
pub fn force_reingest(&mut self) -> Result<()> {
self.conn
.prepare_cached("UPDATE ingested_records SET last_modified=1")?
.execute(())?;
Ok(())
}
pub fn suggestions_table_empty(&self) -> Result<bool> {
Ok(self
.conn
.query_one::<bool>("SELECT NOT EXISTS (SELECT 1 FROM suggestions)")?)
}
pub fn fetch_amp_suggestions(
&self,
query: &SuggestionQuery,
suggestion_type: AmpSuggestionType,
) -> Result<Vec<Suggestion>> {
let keyword_lowercased = &query.keyword.to_lowercase();
let provider = match suggestion_type {
AmpSuggestionType::Mobile => SuggestionProvider::AmpMobile,
AmpSuggestionType::Desktop => SuggestionProvider::Amp,
};
let suggestions = self.conn.query_rows_and_then_cached(
r#"
SELECT
s.id,
k.rank,
s.title,
s.url,
s.provider,
s.score,
fk.full_keyword
FROM
suggestions s
JOIN
keywords k
ON k.suggestion_id = s.id
LEFT JOIN
full_keywords fk
ON k.full_keyword_id = fk.id
WHERE
s.provider = :provider
AND k.keyword = :keyword
AND NOT EXISTS (SELECT 1 FROM dismissed_suggestions WHERE url=s.url)
"#,
named_params! {
":keyword": keyword_lowercased,
":provider": provider
},
|row| -> Result<Suggestion> {
let suggestion_id: i64 = row.get("id")?;
let title = row.get("title")?;
let raw_url: String = row.get("url")?;
let score: f64 = row.get("score")?;
let full_keyword_from_db: Option<String> = row.get("full_keyword")?;
let keywords: Vec<String> = self.conn.query_rows_and_then_cached(
r#"
SELECT
keyword
FROM
keywords
WHERE
suggestion_id = :suggestion_id
AND rank >= :rank
ORDER BY
rank ASC
"#,
named_params! {
":suggestion_id": suggestion_id,
":rank": row.get::<_, i64>("rank")?,
},
|row| row.get(0),
)?;
self.conn.query_row_and_then(
r#"
SELECT
amp.advertiser,
amp.block_id,
amp.iab_category,
amp.impression_url,
amp.click_url,
i.data AS icon,
i.mimetype AS icon_mimetype
FROM
amp_custom_details amp
LEFT JOIN
icons i ON amp.icon_id = i.id
WHERE
amp.suggestion_id = :suggestion_id
"#,
named_params! {
":suggestion_id": suggestion_id
},
|row| {
let cooked_url = cook_raw_suggestion_url(&raw_url);
let raw_click_url = row.get::<_, String>("click_url")?;
let cooked_click_url = cook_raw_suggestion_url(&raw_click_url);
Ok(Suggestion::Amp {
block_id: row.get("block_id")?,
advertiser: row.get("advertiser")?,
iab_category: row.get("iab_category")?,
title,
url: cooked_url,
raw_url,
full_keyword: full_keyword_from_db
.unwrap_or_else(|| full_keyword(keyword_lowercased, &keywords)),
icon: row.get("icon")?,
icon_mimetype: row.get("icon_mimetype")?,
impression_url: row.get("impression_url")?,
click_url: cooked_click_url,
raw_click_url,
score,
})
},
)
},
)?;
Ok(suggestions)
}
pub fn fetch_wikipedia_suggestions(&self, query: &SuggestionQuery) -> Result<Vec<Suggestion>> {
let keyword_lowercased = &query.keyword.to_lowercase();
let suggestions = self.conn.query_rows_and_then_cached(
r#"
SELECT
s.id,
k.rank,
s.title,
s.url
FROM
suggestions s
JOIN
keywords k
ON k.suggestion_id = s.id
WHERE
s.provider = :provider
AND k.keyword = :keyword
AND NOT EXISTS (SELECT 1 FROM dismissed_suggestions WHERE url=s.url)
"#,
named_params! {
":keyword": keyword_lowercased,
":provider": SuggestionProvider::Wikipedia
},
|row| -> Result<Suggestion> {
let suggestion_id: i64 = row.get("id")?;
let title = row.get("title")?;
let raw_url = row.get::<_, String>("url")?;
let keywords: Vec<String> = self.conn.query_rows_and_then_cached(
"SELECT keyword FROM keywords
WHERE suggestion_id = :suggestion_id AND rank >= :rank
ORDER BY rank ASC",
named_params! {
":suggestion_id": suggestion_id,
":rank": row.get::<_, i64>("rank")?,
},
|row| row.get(0),
)?;
let (icon, icon_mimetype) = self
.conn
.try_query_row(
"SELECT i.data, i.mimetype
FROM icons i
JOIN wikipedia_custom_details s ON s.icon_id = i.id
WHERE s.suggestion_id = :suggestion_id
LIMIT 1",
named_params! {
":suggestion_id": suggestion_id
},
|row| -> Result<_> {
Ok((
row.get::<_, Option<Vec<u8>>>(0)?,
row.get::<_, Option<String>>(1)?,
))
},
true,
)?
.unwrap_or((None, None));
Ok(Suggestion::Wikipedia {
title,
url: raw_url,
full_keyword: full_keyword(keyword_lowercased, &keywords),
icon,
icon_mimetype,
})
},
)?;
Ok(suggestions)
}
fn map_prefix_keywords<T>(
&self,
query: &SuggestionQuery,
provider: &SuggestionProvider,
mut mapper: impl FnMut(&rusqlite::Row, &str) -> Result<T>,
) -> Result<Vec<T>> {
let keyword_lowercased = &query.keyword.to_lowercase();
let (keyword_prefix, keyword_suffix) = split_keyword(keyword_lowercased);
let suggestions_limit = query.limit.unwrap_or(-1);
self.conn.query_rows_and_then_cached(
r#"
SELECT
s.id,
MAX(k.rank) AS rank,
s.title,
s.url,
s.provider,
s.score,
k.keyword_suffix
FROM
suggestions s
JOIN
prefix_keywords k
ON k.suggestion_id = s.id
WHERE
k.keyword_prefix = :keyword_prefix
AND (k.keyword_suffix BETWEEN :keyword_suffix AND :keyword_suffix || x'FFFF')
AND s.provider = :provider
AND NOT EXISTS (SELECT 1 FROM dismissed_suggestions WHERE url=s.url)
GROUP BY
s.id
ORDER BY
s.score DESC,
rank DESC
LIMIT
:suggestions_limit
"#,
&[
(":keyword_prefix", &keyword_prefix as &dyn ToSql),
(":keyword_suffix", &keyword_suffix as &dyn ToSql),
(":provider", provider as &dyn ToSql),
(":suggestions_limit", &suggestions_limit as &dyn ToSql),
],
|row| mapper(row, keyword_suffix),
)
}
pub fn fetch_amo_suggestions(&self, query: &SuggestionQuery) -> Result<Vec<Suggestion>> {
let suggestions = self
.map_prefix_keywords(
query,
&SuggestionProvider::Amo,
|row, keyword_suffix| -> Result<Option<Suggestion>> {
let suggestion_id: i64 = row.get("id")?;
let title = row.get("title")?;
let raw_url = row.get::<_, String>("url")?;
let score = row.get::<_, f64>("score")?;
let full_suffix = row.get::<_, String>("keyword_suffix")?;
full_suffix
.starts_with(keyword_suffix)
.then(|| {
self.conn.query_row_and_then(
r#"
SELECT
amo.description,
amo.guid,
amo.rating,
amo.icon_url,
amo.number_of_ratings
FROM
amo_custom_details amo
WHERE
amo.suggestion_id = :suggestion_id
"#,
named_params! {
":suggestion_id": suggestion_id
},
|row| {
Ok(Suggestion::Amo {
title,
url: raw_url,
icon_url: row.get("icon_url")?,
description: row.get("description")?,
rating: row.get("rating")?,
number_of_ratings: row.get("number_of_ratings")?,
guid: row.get("guid")?,
score,
})
},
)
})
.transpose()
},
)?
.into_iter()
.flatten()
.collect();
Ok(suggestions)
}
pub fn fetch_pocket_suggestions(&self, query: &SuggestionQuery) -> Result<Vec<Suggestion>> {
let keyword_lowercased = &query.keyword.to_lowercase();
let (keyword_prefix, keyword_suffix) = split_keyword(keyword_lowercased);
let suggestions = self
.conn
.query_rows_and_then_cached(
r#"
SELECT
s.id,
MAX(k.rank) AS rank,
s.title,
s.url,
s.provider,
s.score,
k.confidence,
k.keyword_suffix
FROM
suggestions s
JOIN
prefix_keywords k
ON k.suggestion_id = s.id
WHERE
k.keyword_prefix = :keyword_prefix
AND (k.keyword_suffix BETWEEN :keyword_suffix AND :keyword_suffix || x'FFFF')
AND s.provider = :provider
AND NOT EXISTS (SELECT 1 FROM dismissed_suggestions WHERE url=s.url)
GROUP BY
s.id,
k.confidence
ORDER BY
s.score DESC,
rank DESC
"#,
named_params! {
":keyword_prefix": keyword_prefix,
":keyword_suffix": keyword_suffix,
":provider": SuggestionProvider::Pocket,
},
|row| -> Result<Option<Suggestion>> {
let title = row.get("title")?;
let raw_url = row.get::<_, String>("url")?;
let score = row.get::<_, f64>("score")?;
let confidence = row.get("confidence")?;
let full_suffix = row.get::<_, String>("keyword_suffix")?;
let suffixes_match = match confidence {
KeywordConfidence::Low => full_suffix.starts_with(keyword_suffix),
KeywordConfidence::High => full_suffix == keyword_suffix,
};
if suffixes_match {
Ok(Some(Suggestion::Pocket {
title,
url: raw_url,
score,
is_top_pick: matches!(confidence, KeywordConfidence::High),
}))
} else {
Ok(None)
}
},
)?
.into_iter()
.flatten()
.take(
query
.limit
.and_then(|limit| usize::try_from(limit).ok())
.unwrap_or(usize::MAX),
)
.collect();
Ok(suggestions)
}
pub fn fetch_mdn_suggestions(&self, query: &SuggestionQuery) -> Result<Vec<Suggestion>> {
let suggestions = self
.map_prefix_keywords(
query,
&SuggestionProvider::Mdn,
|row, keyword_suffix| -> Result<Option<Suggestion>> {
let suggestion_id: i64 = row.get("id")?;
let title = row.get("title")?;
let raw_url = row.get::<_, String>("url")?;
let score = row.get::<_, f64>("score")?;
let full_suffix = row.get::<_, String>("keyword_suffix")?;
full_suffix
.starts_with(keyword_suffix)
.then(|| {
self.conn.query_row_and_then(
r#"
SELECT
description
FROM
mdn_custom_details
WHERE
suggestion_id = :suggestion_id
"#,
named_params! {
":suggestion_id": suggestion_id
},
|row| {
Ok(Suggestion::Mdn {
title,
url: raw_url,
description: row.get("description")?,
score,
})
},
)
})
.transpose()
},
)?
.into_iter()
.flatten()
.collect();
Ok(suggestions)
}
pub fn fetch_fakespot_suggestions(&self, query: &SuggestionQuery) -> Result<Vec<Suggestion>> {
self.conn.query_rows_and_then_cached(
r#"
SELECT
s.title,
s.url,
s.score,
f.fakespot_grade,
f.product_id,
f.rating,
f.total_reviews,
i.data,
i.mimetype,
f.keywords,
f.product_type
FROM
suggestions s
JOIN
fakespot_fts fts
ON fts.rowid = s.id
JOIN
fakespot_custom_details f
ON f.suggestion_id = s.id
LEFT JOIN
icons i
ON i.id = f.icon_id
WHERE
fakespot_fts MATCH ?
ORDER BY
s.score DESC
"#,
(&query.fts_query(),),
|row| {
let score = fakespot::FakespotScore::new(
&query.keyword,
row.get(9)?,
row.get(10)?,
row.get(2)?,
)
.as_suggest_score();
Ok(Suggestion::Fakespot {
title: row.get(0)?,
url: row.get(1)?,
score,
fakespot_grade: row.get(3)?,
product_id: row.get(4)?,
rating: row.get(5)?,
total_reviews: row.get(6)?,
icon: row.get(7)?,
icon_mimetype: row.get(8)?,
})
},
)
}
pub fn fetch_exposure_suggestions(&self, query: &SuggestionQuery) -> Result<Vec<Suggestion>> {
let Some(suggestion_types) = query
.provider_constraints
.as_ref()
.and_then(|c| c.exposure_suggestion_types.as_ref())
else {
return Ok(vec![]);
};
let keyword = query.keyword.to_lowercase();
let params = rusqlite::params_from_iter(
std::iter::once(&SuggestionProvider::Exposure as &dyn ToSql)
.chain(std::iter::once(&keyword as &dyn ToSql))
.chain(suggestion_types.iter().map(|t| t as &dyn ToSql)),
);
self.conn.query_rows_and_then_cached(
&format!(
r#"
SELECT DISTINCT
d.type
FROM
suggestions s
JOIN
exposure_custom_details d
ON d.suggestion_id = s.id
JOIN
keywords k
ON k.suggestion_id = s.id
WHERE
s.provider = ?
AND k.keyword = ?
AND d.type IN ({})
ORDER BY
d.type
"#,
repeat_sql_vars(suggestion_types.len())
),
params,
|row| -> Result<Suggestion> {
Ok(Suggestion::Exposure {
suggestion_type: row.get("type")?,
score: 1.0,
})
},
)
}
pub fn is_exposure_suggestion_ingested(&self, record_id: &SuggestRecordId) -> Result<bool> {
Ok(self.conn.exists(
r#"
SELECT
id
FROM
suggestions
WHERE
record_id = :record_id
"#,
named_params! {
":record_id": record_id.as_str(),
},
)?)
}
pub fn insert_amo_suggestions(
&mut self,
record_id: &SuggestRecordId,
suggestions: &[DownloadedAmoSuggestion],
) -> Result<()> {
let mut suggestion_insert = SuggestionInsertStatement::new(self.conn)?;
let mut amo_insert = AmoInsertStatement::new(self.conn)?;
let mut prefix_keyword_insert = PrefixKeywordInsertStatement::new(self.conn)?;
for suggestion in suggestions {
self.scope.err_if_interrupted()?;
let suggestion_id = suggestion_insert.execute(
record_id,
&suggestion.title,
&suggestion.url,
suggestion.score,
SuggestionProvider::Amo,
)?;
amo_insert.execute(suggestion_id, suggestion)?;
for (index, keyword) in suggestion.keywords.iter().enumerate() {
let (keyword_prefix, keyword_suffix) = split_keyword(keyword);
prefix_keyword_insert.execute(
suggestion_id,
None,
keyword_prefix,
keyword_suffix,
index,
)?;
}
}
Ok(())
}
pub fn insert_amp_wikipedia_suggestions(
&mut self,
record_id: &SuggestRecordId,
suggestions: &[DownloadedAmpWikipediaSuggestion],
) -> Result<()> {
let mut suggestion_insert = SuggestionInsertStatement::new(self.conn)?;
let mut amp_insert = AmpInsertStatement::new(self.conn)?;
let mut wiki_insert = WikipediaInsertStatement::new(self.conn)?;
let mut keyword_insert = KeywordInsertStatement::new(self.conn)?;
for suggestion in suggestions {
self.scope.err_if_interrupted()?;
let common_details = suggestion.common_details();
let provider = suggestion.provider();
let suggestion_id = suggestion_insert.execute(
record_id,
&common_details.title,
&common_details.url,
common_details.score.unwrap_or(DEFAULT_SUGGESTION_SCORE),
provider,
)?;
match suggestion {
DownloadedAmpWikipediaSuggestion::Amp(amp) => {
amp_insert.execute(suggestion_id, amp)?;
}
DownloadedAmpWikipediaSuggestion::Wikipedia(wikipedia) => {
wiki_insert.execute(suggestion_id, wikipedia)?;
}
}
let mut full_keyword_inserter = FullKeywordInserter::new(self.conn, suggestion_id);
for keyword in common_details.keywords() {
let full_keyword_id = match (suggestion, keyword.full_keyword) {
(DownloadedAmpWikipediaSuggestion::Amp(_), Some(full_keyword)) => {
Some(full_keyword_inserter.maybe_insert(full_keyword)?)
}
_ => None,
};
keyword_insert.execute(
suggestion_id,
keyword.keyword,
full_keyword_id,
keyword.rank,
)?;
}
}
Ok(())
}
pub fn insert_amp_mobile_suggestions(
&mut self,
record_id: &SuggestRecordId,
suggestions: &[DownloadedAmpSuggestion],
) -> Result<()> {
let mut suggestion_insert = SuggestionInsertStatement::new(self.conn)?;
let mut amp_insert = AmpInsertStatement::new(self.conn)?;
let mut keyword_insert = KeywordInsertStatement::new(self.conn)?;
for suggestion in suggestions {
self.scope.err_if_interrupted()?;
let common_details = &suggestion.common_details;
let suggestion_id = suggestion_insert.execute(
record_id,
&common_details.title,
&common_details.url,
common_details.score.unwrap_or(DEFAULT_SUGGESTION_SCORE),
SuggestionProvider::AmpMobile,
)?;
amp_insert.execute(suggestion_id, suggestion)?;
let mut full_keyword_inserter = FullKeywordInserter::new(self.conn, suggestion_id);
for keyword in common_details.keywords() {
let full_keyword_id = keyword
.full_keyword
.map(|full_keyword| full_keyword_inserter.maybe_insert(full_keyword))
.transpose()?;
keyword_insert.execute(
suggestion_id,
keyword.keyword,
full_keyword_id,
keyword.rank,
)?;
}
}
Ok(())
}
pub fn insert_pocket_suggestions(
&mut self,
record_id: &SuggestRecordId,
suggestions: &[DownloadedPocketSuggestion],
) -> Result<()> {
let mut suggestion_insert = SuggestionInsertStatement::new(self.conn)?;
let mut prefix_keyword_insert = PrefixKeywordInsertStatement::new(self.conn)?;
for suggestion in suggestions {
self.scope.err_if_interrupted()?;
let suggestion_id = suggestion_insert.execute(
record_id,
&suggestion.title,
&suggestion.url,
suggestion.score,
SuggestionProvider::Pocket,
)?;
for ((rank, keyword), confidence) in suggestion
.high_confidence_keywords
.iter()
.enumerate()
.zip(std::iter::repeat(KeywordConfidence::High))
.chain(
suggestion
.low_confidence_keywords
.iter()
.enumerate()
.zip(std::iter::repeat(KeywordConfidence::Low)),
)
{
let (keyword_prefix, keyword_suffix) = split_keyword(keyword);
prefix_keyword_insert.execute(
suggestion_id,
Some(confidence as u8),
keyword_prefix,
keyword_suffix,
rank,
)?;
}
}
Ok(())
}
pub fn insert_mdn_suggestions(
&mut self,
record_id: &SuggestRecordId,
suggestions: &[DownloadedMdnSuggestion],
) -> Result<()> {
let mut suggestion_insert = SuggestionInsertStatement::new(self.conn)?;
let mut mdn_insert = MdnInsertStatement::new(self.conn)?;
let mut prefix_keyword_insert = PrefixKeywordInsertStatement::new(self.conn)?;
for suggestion in suggestions {
self.scope.err_if_interrupted()?;
let suggestion_id = suggestion_insert.execute(
record_id,
&suggestion.title,
&suggestion.url,
suggestion.score,
SuggestionProvider::Mdn,
)?;
mdn_insert.execute(suggestion_id, suggestion)?;
for (index, keyword) in suggestion.keywords.iter().enumerate() {
let (keyword_prefix, keyword_suffix) = split_keyword(keyword);
prefix_keyword_insert.execute(
suggestion_id,
None,
keyword_prefix,
keyword_suffix,
index,
)?;
}
}
Ok(())
}
pub fn insert_fakespot_suggestions(
&mut self,
record_id: &SuggestRecordId,
suggestions: &[DownloadedFakespotSuggestion],
) -> Result<()> {
let mut suggestion_insert = SuggestionInsertStatement::new(self.conn)?;
let mut fakespot_insert = FakespotInsertStatement::new(self.conn)?;
for suggestion in suggestions {
let suggestion_id = suggestion_insert.execute(
record_id,
&suggestion.title,
&suggestion.url,
suggestion.score,
SuggestionProvider::Fakespot,
)?;
fakespot_insert.execute(suggestion_id, suggestion)?;
}
Ok(())
}
pub fn insert_exposure_suggestions(
&mut self,
record_id: &SuggestRecordId,
suggestion_type: &str,
suggestions: &[DownloadedExposureSuggestion],
) -> Result<()> {
let mut keyword_insert = KeywordInsertStatement::new_with_or_ignore(self.conn)?;
let mut suggestion_insert = SuggestionInsertStatement::new(self.conn)?;
let mut exposure_insert = ExposureInsertStatement::new(self.conn)?;
for suggestion in suggestions {
self.scope.err_if_interrupted()?;
let suggestion_id = suggestion_insert.execute(
record_id,
"", "", DEFAULT_SUGGESTION_SCORE,
SuggestionProvider::Exposure,
)?;
exposure_insert.execute(suggestion_id, suggestion_type)?;
for (rank, keyword) in suggestion.keywords().enumerate() {
keyword_insert.execute(suggestion_id, &keyword, None, rank)?;
}
}
Ok(())
}
pub fn put_icon(&mut self, icon_id: &str, data: &[u8], mimetype: &str) -> Result<()> {
self.conn.execute(
"INSERT OR REPLACE INTO icons(
id,
data,
mimetype
)
VALUES(
:id,
:data,
:mimetype
)",
named_params! {
":id": icon_id,
":data": data,
":mimetype": mimetype,
},
)?;
Ok(())
}
pub fn insert_dismissal(&self, url: &str) -> Result<()> {
self.conn.execute(
"INSERT OR IGNORE INTO dismissed_suggestions(url)
VALUES(:url)",
named_params! {
":url": url,
},
)?;
Ok(())
}
pub fn clear_dismissals(&self) -> Result<()> {
self.conn.execute("DELETE FROM dismissed_suggestions", ())?;
Ok(())
}
pub fn drop_suggestions(&mut self, record_id: &SuggestRecordId) -> Result<()> {
self.scope.err_if_interrupted()?;
self.conn.execute_cached(
"DELETE FROM keywords WHERE suggestion_id IN (SELECT id from suggestions WHERE record_id = :record_id)",
named_params! { ":record_id": record_id.as_str() },
)?;
self.scope.err_if_interrupted()?;
self.conn.execute_cached(
"DELETE FROM full_keywords WHERE suggestion_id IN (SELECT id from suggestions WHERE record_id = :record_id)",
named_params! { ":record_id": record_id.as_str() },
)?;
self.scope.err_if_interrupted()?;
self.conn.execute_cached(
"DELETE FROM prefix_keywords WHERE suggestion_id IN (SELECT id from suggestions WHERE record_id = :record_id)",
named_params! { ":record_id": record_id.as_str() },
)?;
self.scope.err_if_interrupted()?;
self.conn.execute_cached(
"DELETE FROM keywords_metrics WHERE record_id = :record_id",
named_params! { ":record_id": record_id.as_str() },
)?;
self.scope.err_if_interrupted()?;
self.conn.execute_cached(
"
DELETE FROM fakespot_fts
WHERE rowid IN (SELECT id from suggestions WHERE record_id = :record_id)
",
named_params! { ":record_id": record_id.as_str() },
)?;
self.scope.err_if_interrupted()?;
self.conn.execute_cached(
"DELETE FROM suggestions WHERE record_id = :record_id",
named_params! { ":record_id": record_id.as_str() },
)?;
self.scope.err_if_interrupted()?;
self.conn.execute_cached(
"DELETE FROM yelp_subjects WHERE record_id = :record_id",
named_params! { ":record_id": record_id.as_str() },
)?;
self.scope.err_if_interrupted()?;
self.conn.execute_cached(
"DELETE FROM yelp_modifiers WHERE record_id = :record_id",
named_params! { ":record_id": record_id.as_str() },
)?;
self.scope.err_if_interrupted()?;
self.conn.execute_cached(
"DELETE FROM yelp_location_signs WHERE record_id = :record_id",
named_params! { ":record_id": record_id.as_str() },
)?;
self.scope.err_if_interrupted()?;
self.conn.execute_cached(
"DELETE FROM yelp_custom_details WHERE record_id = :record_id",
named_params! { ":record_id": record_id.as_str() },
)?;
self.scope.err_if_interrupted()?;
self.conn.execute_cached(
"DELETE FROM geonames WHERE record_id = :record_id",
named_params! { ":record_id": record_id.as_str() },
)?;
self.scope.err_if_interrupted()?;
self.conn.execute_cached(
"DELETE FROM geonames_metrics WHERE record_id = :record_id",
named_params! { ":record_id": record_id.as_str() },
)?;
self.weather_cache.take();
self.geoname_cache.take();
Ok(())
}
pub fn drop_icon(&mut self, icon_id: &str) -> Result<()> {
self.conn.execute_cached(
"DELETE FROM icons WHERE id = :id",
named_params! { ":id": icon_id },
)?;
Ok(())
}
pub fn clear(&mut self) -> Result<()> {
Ok(clear_database(self.conn)?)
}
pub fn get_meta<T: FromSql>(&self, key: &str) -> Result<Option<T>> {
Ok(self.conn.try_query_one(
"SELECT value FROM meta WHERE key = :key",
named_params! { ":key": key },
true,
)?)
}
pub fn put_meta(&mut self, key: &str, value: impl ToSql) -> Result<()> {
self.conn.execute_cached(
"INSERT OR REPLACE INTO meta(key, value) VALUES(:key, :value)",
named_params! { ":key": key, ":value": value },
)?;
Ok(())
}
pub fn put_global_config(&mut self, config: &SuggestGlobalConfig) -> Result<()> {
self.put_meta(GLOBAL_CONFIG_META_KEY, serde_json::to_string(config)?)
}
pub fn get_global_config(&self) -> Result<SuggestGlobalConfig> {
self.get_meta::<String>(GLOBAL_CONFIG_META_KEY)?
.map_or_else(
|| Ok(SuggestGlobalConfig::default()),
|json| Ok(serde_json::from_str(&json)?),
)
}
pub fn put_provider_config(
&mut self,
provider: SuggestionProvider,
config: &SuggestProviderConfig,
) -> Result<()> {
self.put_meta(
&provider_config_meta_key(provider),
serde_json::to_string(config)?,
)
}
pub fn get_provider_config(
&self,
provider: SuggestionProvider,
) -> Result<Option<SuggestProviderConfig>> {
self.get_meta::<String>(&provider_config_meta_key(provider))?
.map_or_else(|| Ok(None), |json| Ok(serde_json::from_str(&json)?))
}
}
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct IngestedRecord {
pub id: SuggestRecordId,
pub collection: String,
pub record_type: String,
pub last_modified: u64,
}
impl IngestedRecord {
fn from_row(row: &rusqlite::Row) -> Result<Self> {
Ok(Self {
id: SuggestRecordId::new(row.get("id")?),
collection: row.get("collection")?,
record_type: row.get("type")?,
last_modified: row.get("last_modified")?,
})
}
}
struct FullKeywordInserter<'a> {
conn: &'a Connection,
suggestion_id: i64,
last_inserted: Option<(&'a str, i64)>,
}
impl<'a> FullKeywordInserter<'a> {
fn new(conn: &'a Connection, suggestion_id: i64) -> Self {
Self {
conn,
suggestion_id,
last_inserted: None,
}
}
fn maybe_insert(&mut self, full_keyword: &'a str) -> rusqlite::Result<i64> {
match self.last_inserted {
Some((s, id)) if s == full_keyword => Ok(id),
_ => {
let full_keyword_id = self.conn.query_row_and_then(
"INSERT INTO full_keywords(
suggestion_id,
full_keyword
)
VALUES(
:suggestion_id,
:keyword
)
RETURNING id",
named_params! {
":keyword": full_keyword,
":suggestion_id": self.suggestion_id,
},
|row| row.get(0),
)?;
self.last_inserted = Some((full_keyword, full_keyword_id));
Ok(full_keyword_id)
}
}
}
}
pub(crate) struct SuggestionInsertStatement<'conn>(rusqlite::Statement<'conn>);
impl<'conn> SuggestionInsertStatement<'conn> {
pub(crate) fn new(conn: &'conn Connection) -> Result<Self> {
Ok(Self(conn.prepare(
"INSERT INTO suggestions(
record_id,
title,
url,
score,
provider
)
VALUES(?, ?, ?, ?, ?)
RETURNING id",
)?))
}
pub(crate) fn execute(
&mut self,
record_id: &SuggestRecordId,
title: &str,
url: &str,
score: f64,
provider: SuggestionProvider,
) -> Result<i64> {
self.0
.query_row(
(record_id.as_str(), title, url, score, provider as u8),
|row| row.get(0),
)
.with_context("suggestion insert")
}
}
struct AmpInsertStatement<'conn>(rusqlite::Statement<'conn>);
impl<'conn> AmpInsertStatement<'conn> {
fn new(conn: &'conn Connection) -> Result<Self> {
Ok(Self(conn.prepare(
"INSERT INTO amp_custom_details(
suggestion_id,
advertiser,
block_id,
iab_category,
impression_url,
click_url,
icon_id
)
VALUES(?, ?, ?, ?, ?, ?, ?)
",
)?))
}
fn execute(&mut self, suggestion_id: i64, amp: &DownloadedAmpSuggestion) -> Result<()> {
self.0
.execute((
suggestion_id,
&.advertiser,
amp.block_id,
&.iab_category,
&.impression_url,
&.click_url,
&.icon_id,
))
.with_context("amp insert")?;
Ok(())
}
}
struct WikipediaInsertStatement<'conn>(rusqlite::Statement<'conn>);
impl<'conn> WikipediaInsertStatement<'conn> {
fn new(conn: &'conn Connection) -> Result<Self> {
Ok(Self(conn.prepare(
"INSERT INTO wikipedia_custom_details(
suggestion_id,
icon_id
)
VALUES(?, ?)
",
)?))
}
fn execute(
&mut self,
suggestion_id: i64,
wikipedia: &DownloadedWikipediaSuggestion,
) -> Result<()> {
self.0
.execute((suggestion_id, &wikipedia.icon_id))
.with_context("wikipedia insert")?;
Ok(())
}
}
struct AmoInsertStatement<'conn>(rusqlite::Statement<'conn>);
impl<'conn> AmoInsertStatement<'conn> {
fn new(conn: &'conn Connection) -> Result<Self> {
Ok(Self(conn.prepare(
"INSERT INTO amo_custom_details(
suggestion_id,
description,
guid,
icon_url,
rating,
number_of_ratings
)
VALUES(?, ?, ?, ?, ?, ?)
",
)?))
}
fn execute(&mut self, suggestion_id: i64, amo: &DownloadedAmoSuggestion) -> Result<()> {
self.0
.execute((
suggestion_id,
&amo.description,
&amo.guid,
&amo.icon_url,
&amo.rating,
amo.number_of_ratings,
))
.with_context("amo insert")?;
Ok(())
}
}
struct MdnInsertStatement<'conn>(rusqlite::Statement<'conn>);
impl<'conn> MdnInsertStatement<'conn> {
fn new(conn: &'conn Connection) -> Result<Self> {
Ok(Self(conn.prepare(
"INSERT INTO mdn_custom_details(
suggestion_id,
description
)
VALUES(?, ?)
",
)?))
}
fn execute(&mut self, suggestion_id: i64, mdn: &DownloadedMdnSuggestion) -> Result<()> {
self.0
.execute((suggestion_id, &mdn.description))
.with_context("mdn insert")?;
Ok(())
}
}
struct FakespotInsertStatement<'conn>(rusqlite::Statement<'conn>);
impl<'conn> FakespotInsertStatement<'conn> {
fn new(conn: &'conn Connection) -> Result<Self> {
Ok(Self(conn.prepare(
"INSERT INTO fakespot_custom_details(
suggestion_id,
fakespot_grade,
product_id,
keywords,
product_type,
rating,
total_reviews,
icon_id
)
VALUES(?, ?, ?, ?, ?, ?, ?, ?)
",
)?))
}
fn execute(
&mut self,
suggestion_id: i64,
fakespot: &DownloadedFakespotSuggestion,
) -> Result<()> {
let icon_id = fakespot
.product_id
.split_once('-')
.map(|(vendor, _)| format!("fakespot-{vendor}"));
self.0
.execute((
suggestion_id,
&fakespot.fakespot_grade,
&fakespot.product_id,
&fakespot.keywords.to_lowercase(),
&fakespot.product_type.to_lowercase(),
fakespot.rating,
fakespot.total_reviews,
icon_id,
))
.with_context("fakespot insert")?;
Ok(())
}
}
struct ExposureInsertStatement<'conn>(rusqlite::Statement<'conn>);
impl<'conn> ExposureInsertStatement<'conn> {
fn new(conn: &'conn Connection) -> Result<Self> {
Ok(Self(conn.prepare(
"INSERT INTO exposure_custom_details(
suggestion_id,
type
)
VALUES(?, ?)
",
)?))
}
fn execute(&mut self, suggestion_id: i64, suggestion_type: &str) -> Result<()> {
self.0
.execute((suggestion_id, suggestion_type))
.with_context("exposure insert")?;
Ok(())
}
}
pub(crate) struct KeywordInsertStatement<'conn>(rusqlite::Statement<'conn>);
impl<'conn> KeywordInsertStatement<'conn> {
pub(crate) fn new(conn: &'conn Connection) -> Result<Self> {
Ok(Self(conn.prepare(
"INSERT INTO keywords(
suggestion_id,
keyword,
full_keyword_id,
rank
)
VALUES(?, ?, ?, ?)
",
)?))
}
pub(crate) fn new_with_or_ignore(conn: &'conn Connection) -> Result<Self> {
Ok(Self(conn.prepare(
"INSERT OR IGNORE INTO keywords(
suggestion_id,
keyword,
full_keyword_id,
rank
)
VALUES(?, ?, ?, ?)
",
)?))
}
pub(crate) fn execute(
&mut self,
suggestion_id: i64,
keyword: &str,
full_keyword_id: Option<i64>,
rank: usize,
) -> Result<()> {
self.0
.execute((suggestion_id, keyword, full_keyword_id, rank))
.with_context("keyword insert")?;
Ok(())
}
}
struct PrefixKeywordInsertStatement<'conn>(rusqlite::Statement<'conn>);
impl<'conn> PrefixKeywordInsertStatement<'conn> {
fn new(conn: &'conn Connection) -> Result<Self> {
Ok(Self(conn.prepare(
"INSERT INTO prefix_keywords(
suggestion_id,
confidence,
keyword_prefix,
keyword_suffix,
rank
)
VALUES(?, ?, ?, ?, ?)
",
)?))
}
fn execute(
&mut self,
suggestion_id: i64,
confidence: Option<u8>,
keyword_prefix: &str,
keyword_suffix: &str,
rank: usize,
) -> Result<()> {
self.0
.execute((
suggestion_id,
confidence.unwrap_or(0),
keyword_prefix,
keyword_suffix,
rank,
))
.with_context("prefix keyword insert")?;
Ok(())
}
}
pub(crate) struct KeywordMetricsInsertStatement<'conn>(rusqlite::Statement<'conn>);
impl<'conn> KeywordMetricsInsertStatement<'conn> {
pub(crate) fn new(conn: &'conn Connection) -> Result<Self> {
Ok(Self(conn.prepare(
"INSERT INTO keywords_metrics(
record_id,
provider,
max_length,
max_word_count
)
VALUES(?, ?, ?, ?)
",
)?))
}
pub(crate) fn execute(
&mut self,
record_id: &SuggestRecordId,
provider: SuggestionProvider,
max_len: usize,
max_word_count: usize,
) -> Result<()> {
self.0
.execute((record_id.as_str(), provider, max_len, max_word_count))
.with_context("keyword metrics insert")?;
Ok(())
}
}
fn provider_config_meta_key(provider: SuggestionProvider) -> String {
format!("{}{}", PROVIDER_CONFIG_META_KEY_PREFIX, provider as u8)
}