suggest/
db.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
4 */
5
6use std::{cell::OnceCell, path::Path, sync::Arc};
7
8use interrupt_support::{SqlInterruptHandle, SqlInterruptScope};
9use parking_lot::{Mutex, MutexGuard};
10use rusqlite::{
11    named_params,
12    types::{FromSql, ToSql},
13    Connection,
14};
15use sql_support::{open_database, repeat_sql_vars, ConnExt};
16
17use crate::{
18    config::{SuggestGlobalConfig, SuggestProviderConfig},
19    error::RusqliteResultExt,
20    fakespot,
21    geoname::GeonameCache,
22    provider::{AmpMatchingStrategy, SuggestionProvider},
23    query::{full_keywords_to_fts_content, FtsQuery},
24    rs::{
25        DownloadedAmoSuggestion, DownloadedAmpSuggestion, DownloadedDynamicRecord,
26        DownloadedDynamicSuggestion, DownloadedFakespotSuggestion, DownloadedMdnSuggestion,
27        DownloadedWikipediaSuggestion, Record, SuggestRecordId, SuggestRecordType,
28    },
29    schema::{clear_database, SuggestConnectionInitializer},
30    suggestion::{cook_raw_suggestion_url, FtsMatchInfo, Suggestion},
31    util::{full_keyword, i18n_transform, split_keyword},
32    weather::WeatherCache,
33    Result, SuggestionQuery,
34};
35
36/// The metadata key whose value is a JSON string encoding a
37/// `SuggestGlobalConfig`, which contains global Suggest configuration data.
38pub const GLOBAL_CONFIG_META_KEY: &str = "global_config";
39/// Prefix of metadata keys whose values are JSON strings encoding
40/// `SuggestProviderConfig`, which contains per-provider configuration data. The
41/// full key is this prefix plus the `SuggestionProvider` value as a u8.
42pub const PROVIDER_CONFIG_META_KEY_PREFIX: &str = "provider_config_";
43
44// Default value when Suggestion does not have a value for score
45pub const DEFAULT_SUGGESTION_SCORE: f64 = 0.2;
46
47/// The database connection type.
48#[derive(Clone, Copy)]
49pub(crate) enum ConnectionType {
50    ReadOnly,
51    ReadWrite,
52}
53
54#[derive(Default, Clone)]
55pub struct Sqlite3Extension {
56    pub library: String,
57    pub entry_point: Option<String>,
58}
59
60/// A thread-safe wrapper around an SQLite connection to the Suggest database,
61/// and its interrupt handle.
62pub(crate) struct SuggestDb {
63    pub conn: Mutex<Connection>,
64
65    /// An object that's used to interrupt an ongoing database operation.
66    ///
67    /// When this handle is interrupted, the thread that's currently accessing
68    /// the database will be told to stop and release the `conn` lock as soon
69    /// as possible.
70    pub interrupt_handle: Arc<SqlInterruptHandle>,
71}
72
73impl SuggestDb {
74    /// Opens a read-only or read-write connection to a Suggest database at the
75    /// given path.
76    pub fn open(
77        path: impl AsRef<Path>,
78        extensions_to_load: &[Sqlite3Extension],
79        type_: ConnectionType,
80    ) -> Result<Self> {
81        let conn = open_database::open_database_with_flags(
82            path,
83            match type_ {
84                ConnectionType::ReadWrite => open_database::read_write_flags(),
85                ConnectionType::ReadOnly => open_database::read_only_flags(),
86            },
87            &SuggestConnectionInitializer::new(extensions_to_load),
88        )?;
89        Ok(Self::with_connection(conn))
90    }
91
92    fn with_connection(conn: Connection) -> Self {
93        let interrupt_handle = Arc::new(SqlInterruptHandle::new(&conn));
94        Self {
95            conn: Mutex::new(conn),
96            interrupt_handle,
97        }
98    }
99
100    /// Accesses the Suggest database for reading.
101    pub fn read<T>(&self, op: impl FnOnce(&SuggestDao) -> Result<T>) -> Result<T> {
102        let conn = self.conn.lock();
103        let scope = self.interrupt_handle.begin_interrupt_scope()?;
104        let dao = SuggestDao::new(&conn, &scope);
105        op(&dao)
106    }
107
108    /// Accesses the Suggest database in a transaction for reading and writing.
109    pub fn write<T>(&self, op: impl FnOnce(&mut SuggestDao) -> Result<T>) -> Result<T> {
110        let mut conn = self.conn.lock();
111        let scope = self.interrupt_handle.begin_interrupt_scope()?;
112        let tx = conn.transaction()?;
113        let mut dao = SuggestDao::new(&tx, &scope);
114        let result = op(&mut dao)?;
115        tx.commit()?;
116        Ok(result)
117    }
118
119    /// Create a new write scope.
120    ///
121    /// This enables performing multiple `write()` calls with the same shared interrupt scope.
122    /// This is important for things like ingestion, where you want the operation to be interrupted
123    /// if [Self::interrupt_handle::interrupt] is called after the operation starts.  Calling
124    /// [Self::write] multiple times during the operation risks missing a call that happens after
125    /// between those calls.
126    pub fn write_scope(&self) -> Result<WriteScope> {
127        Ok(WriteScope {
128            conn: self.conn.lock(),
129            scope: self.interrupt_handle.begin_interrupt_scope()?,
130        })
131    }
132}
133
134pub(crate) struct WriteScope<'a> {
135    pub conn: MutexGuard<'a, Connection>,
136    pub scope: SqlInterruptScope,
137}
138
139impl WriteScope<'_> {
140    /// Accesses the Suggest database in a transaction for reading and writing.
141    pub fn write<T>(&mut self, op: impl FnOnce(&mut SuggestDao) -> Result<T>) -> Result<T> {
142        let tx = self.conn.transaction()?;
143        let mut dao = SuggestDao::new(&tx, &self.scope);
144        let result = op(&mut dao)?;
145        tx.commit()?;
146        Ok(result)
147    }
148
149    /// Accesses the Suggest database in a transaction for reading only
150    pub fn read<T>(&mut self, op: impl FnOnce(&SuggestDao) -> Result<T>) -> Result<T> {
151        let tx = self.conn.transaction()?;
152        let dao = SuggestDao::new(&tx, &self.scope);
153        let result = op(&dao)?;
154        tx.commit()?;
155        Ok(result)
156    }
157
158    pub fn err_if_interrupted(&self) -> Result<()> {
159        Ok(self.scope.err_if_interrupted()?)
160    }
161}
162
163/// A data access object (DAO) that wraps a connection to the Suggest database
164/// with methods for reading and writing suggestions, icons, and metadata.
165///
166/// Methods that only read from the database take an immutable reference to
167/// `self` (`&self`), and methods that write to the database take a mutable
168/// reference (`&mut self`).
169pub(crate) struct SuggestDao<'a> {
170    pub conn: &'a Connection,
171    pub scope: &'a SqlInterruptScope,
172    pub weather_cache: OnceCell<WeatherCache>,
173    pub geoname_cache: OnceCell<GeonameCache>,
174}
175
176impl<'a> SuggestDao<'a> {
177    fn new(conn: &'a Connection, scope: &'a SqlInterruptScope) -> Self {
178        Self {
179            conn,
180            scope,
181            weather_cache: std::cell::OnceCell::new(),
182            geoname_cache: std::cell::OnceCell::new(),
183        }
184    }
185
186    // =============== High level API ===============
187    //
188    //  These methods combine several low-level calls into one logical operation.
189
190    pub fn delete_record_data(&mut self, record_id: &SuggestRecordId) -> Result<()> {
191        // Drop either the icon or suggestions, records only contain one or the other
192        match record_id.as_icon_id() {
193            Some(icon_id) => self.drop_icon(icon_id)?,
194            None => self.drop_suggestions(record_id)?,
195        };
196        Ok(())
197    }
198
199    // =============== Low level API ===============
200    //
201    //  These methods implement CRUD operations
202
203    pub fn get_ingested_records(&self) -> Result<Vec<IngestedRecord>> {
204        let mut stmt = self
205            .conn
206            .prepare_cached("SELECT id, collection, type, last_modified FROM ingested_records")?;
207        let rows = stmt.query_and_then((), IngestedRecord::from_row)?;
208        rows.collect()
209    }
210
211    pub fn update_ingested_records(
212        &mut self,
213        collection: &str,
214        new_records: &[&Record],
215        updated_records: &[&Record],
216        deleted_records: &[&IngestedRecord],
217    ) -> Result<()> {
218        let mut delete_stmt = self
219            .conn
220            .prepare_cached("DELETE FROM ingested_records WHERE collection = ? AND id = ?")?;
221        for deleted in deleted_records {
222            delete_stmt.execute((collection, deleted.id.as_str()))?;
223        }
224
225        let mut insert_stmt = self.conn.prepare_cached(
226            "INSERT OR REPLACE INTO ingested_records(id, collection, type, last_modified) VALUES(?, ?, ?, ?)",
227        )?;
228        for record in new_records.iter().chain(updated_records) {
229            insert_stmt.execute((
230                record.id.as_str(),
231                collection,
232                record.record_type().as_str(),
233                record.last_modified,
234            ))?;
235        }
236        Ok(())
237    }
238
239    /// Update the DB so that we re-ingest all records on the next ingestion.
240    ///
241    /// We hack this by setting the last_modified time to 1 so that the next time around we always
242    /// re-ingest the record.
243    pub fn force_reingest(&mut self) -> Result<()> {
244        self.conn
245            .prepare_cached("UPDATE ingested_records SET last_modified=1")?
246            .execute(())?;
247        Ok(())
248    }
249
250    pub fn suggestions_table_empty(&self) -> Result<bool> {
251        Ok(self
252            .conn
253            .conn_ext_query_one::<bool>("SELECT NOT EXISTS (SELECT 1 FROM suggestions)")?)
254    }
255
256    /// Fetches Suggestions of type Amp provider that match the given query
257    pub fn fetch_amp_suggestions(&self, query: &SuggestionQuery) -> Result<Vec<Suggestion>> {
258        let strategy = query
259            .provider_constraints
260            .as_ref()
261            .and_then(|c| c.amp_alternative_matching.as_ref());
262        match strategy {
263            None => self.fetch_amp_suggestions_using_keywords(query, true),
264            Some(AmpMatchingStrategy::NoKeywordExpansion) => {
265                self.fetch_amp_suggestions_using_keywords(query, false)
266            }
267            Some(AmpMatchingStrategy::FtsAgainstFullKeywords) => {
268                self.fetch_amp_suggestions_using_fts(query, "full_keywords")
269            }
270            Some(AmpMatchingStrategy::FtsAgainstTitle) => {
271                self.fetch_amp_suggestions_using_fts(query, "title")
272            }
273        }
274    }
275
276    pub fn fetch_amp_suggestions_using_keywords(
277        &self,
278        query: &SuggestionQuery,
279        allow_keyword_expansion: bool,
280    ) -> Result<Vec<Suggestion>> {
281        let keyword_lowercased = &query.keyword.to_lowercase();
282        let where_extra = if allow_keyword_expansion {
283            ""
284        } else {
285            "AND INSTR(CONCAT(fk.full_keyword, ' '), k.keyword) != 0"
286        };
287        let suggestions = self.conn.query_rows_and_then_cached(
288            &format!(
289                r#"
290                SELECT
291                  s.id,
292                  k.rank,
293                  s.title,
294                  s.url,
295                  s.provider,
296                  s.score,
297                  fk.full_keyword
298                FROM
299                  suggestions s
300                JOIN
301                  keywords k
302                  ON k.suggestion_id = s.id
303                LEFT JOIN
304                  full_keywords fk
305                  ON k.full_keyword_id = fk.id
306                WHERE
307                  s.provider = :provider
308                  AND k.keyword = :keyword
309                  {where_extra}
310                  AND NOT EXISTS (
311                    -- For AMP suggestions dismissed with the deprecated URL-based dismissal API,
312                    -- `dismissed_suggestions.url` will be the suggestion URL. With the new
313                    -- `Suggestion`-based API, it will be the full keyword.
314                    SELECT 1 FROM dismissed_suggestions WHERE url IN (fk.full_keyword, s.url)
315                  )
316                "#
317            ),
318            named_params! {
319                ":keyword": keyword_lowercased,
320                ":provider": SuggestionProvider::Amp,
321            },
322            |row| -> Result<Suggestion> {
323                let suggestion_id: i64 = row.get("id")?;
324                let title = row.get("title")?;
325                let raw_url: String = row.get("url")?;
326                let score: f64 = row.get("score")?;
327                let full_keyword_from_db: Option<String> = row.get("full_keyword")?;
328
329                self.conn.query_row_and_then(
330                    r#"
331                    SELECT
332                      amp.advertiser,
333                      amp.block_id,
334                      amp.iab_category,
335                      amp.impression_url,
336                      amp.click_url,
337                      i.data AS icon,
338                      i.mimetype AS icon_mimetype
339                    FROM
340                      amp_custom_details amp
341                    LEFT JOIN
342                      icons i ON amp.icon_id = i.id
343                    WHERE
344                      amp.suggestion_id = :suggestion_id
345                    "#,
346                    named_params! {
347                        ":suggestion_id": suggestion_id
348                    },
349                    |row| {
350                        let cooked_url = cook_raw_suggestion_url(&raw_url);
351                        let raw_click_url = row.get::<_, String>("click_url")?;
352                        let cooked_click_url = cook_raw_suggestion_url(&raw_click_url);
353
354                        let categories = self.fetch_categories_for_suggestion(suggestion_id)?;
355
356                        Ok(Suggestion::Amp {
357                            block_id: row.get("block_id")?,
358                            advertiser: row.get("advertiser")?,
359                            iab_category: row.get("iab_category")?,
360                            categories,
361                            title,
362                            url: cooked_url,
363                            raw_url,
364                            full_keyword: full_keyword_from_db.unwrap_or_default(),
365                            icon: row.get("icon")?,
366                            icon_mimetype: row.get("icon_mimetype")?,
367                            impression_url: row.get("impression_url")?,
368                            click_url: cooked_click_url,
369                            raw_click_url,
370                            score,
371                            fts_match_info: None,
372                        })
373                    },
374                )
375            },
376        )?;
377        Ok(suggestions)
378    }
379
380    pub fn fetch_amp_suggestions_using_fts(
381        &self,
382        query: &SuggestionQuery,
383        fts_column: &str,
384    ) -> Result<Vec<Suggestion>> {
385        let fts_query = query.fts_query();
386        let match_arg = &fts_query.match_arg;
387        let suggestions = self.conn.query_rows_and_then_cached(
388            &format!(
389                r#"
390                SELECT
391                  s.id,
392                  s.title,
393                  s.url,
394                  s.provider,
395                  s.score
396                FROM
397                  suggestions s
398                JOIN
399                  amp_fts fts
400                  ON fts.rowid = s.id
401                WHERE
402                  s.provider = :provider
403                  AND amp_fts match '{fts_column}: {match_arg}'
404                AND NOT EXISTS (SELECT 1 FROM dismissed_suggestions WHERE url=s.url)
405                ORDER BY rank
406                LIMIT 1
407                "#
408            ),
409            named_params! {
410                ":provider": SuggestionProvider::Amp,
411            },
412            |row| -> Result<Suggestion> {
413                let suggestion_id: i64 = row.get("id")?;
414                let title: String = row.get("title")?;
415                let raw_url: String = row.get("url")?;
416                let score: f64 = row.get("score")?;
417
418                self.conn.query_row_and_then(
419                    r#"
420                    SELECT
421                      amp.advertiser,
422                      amp.block_id,
423                      amp.iab_category,
424                      amp.impression_url,
425                      amp.click_url,
426                      i.data AS icon,
427                      i.mimetype AS icon_mimetype
428                    FROM
429                      amp_custom_details amp
430                    LEFT JOIN
431                      icons i ON amp.icon_id = i.id
432                    WHERE
433                      amp.suggestion_id = :suggestion_id
434                    "#,
435                    named_params! {
436                        ":suggestion_id": suggestion_id
437                    },
438                    |row| {
439                        let cooked_url = cook_raw_suggestion_url(&raw_url);
440                        let raw_click_url = row.get::<_, String>("click_url")?;
441                        let cooked_click_url = cook_raw_suggestion_url(&raw_click_url);
442                        let match_info = self.fetch_amp_fts_match_info(
443                            &fts_query,
444                            suggestion_id,
445                            fts_column,
446                            &title,
447                        )?;
448
449                        let categories = self.fetch_categories_for_suggestion(suggestion_id)?;
450
451                        Ok(Suggestion::Amp {
452                            block_id: row.get("block_id")?,
453                            advertiser: row.get("advertiser")?,
454                            iab_category: row.get("iab_category")?,
455                            categories,
456                            title,
457                            url: cooked_url,
458                            raw_url,
459                            full_keyword: query.keyword.clone(),
460                            icon: row.get("icon")?,
461                            icon_mimetype: row.get("icon_mimetype")?,
462                            impression_url: row.get("impression_url")?,
463                            click_url: cooked_click_url,
464                            raw_click_url,
465                            score,
466                            fts_match_info: Some(match_info),
467                        })
468                    },
469                )
470            },
471        )?;
472        Ok(suggestions)
473    }
474
475    fn fetch_amp_fts_match_info(
476        &self,
477        fts_query: &FtsQuery<'_>,
478        suggestion_id: i64,
479        fts_column: &str,
480        title: &str,
481    ) -> Result<FtsMatchInfo> {
482        let fts_content = match fts_column {
483            "title" => title.to_lowercase(),
484            "full_keywords" => {
485                let full_keyword_list: Vec<String> = self.conn.query_rows_and_then(
486                    "
487                    SELECT fk.full_keyword
488                    FROM full_keywords fk
489                    JOIN keywords k on fk.id == k.full_keyword_id
490                    WHERE k.suggestion_id = ?
491                    ",
492                    (suggestion_id,),
493                    |row| row.get(0),
494                )?;
495                full_keywords_to_fts_content(full_keyword_list.iter().map(String::as_str))
496            }
497            // fts_column comes from the code above and we know there's only 2 possibilities
498            _ => unreachable!(),
499        };
500
501        let prefix = if fts_query.is_prefix_query {
502            // If the query was a prefix match query then test if the query without the prefix
503            // match would have also matched.  If not, then this counts as a prefix match.
504            let sql = "SELECT 1 FROM amp_fts WHERE rowid = ? AND amp_fts MATCH ?";
505            let params = (&suggestion_id, &fts_query.match_arg_without_prefix_match);
506            !self.conn.exists(sql, params)?
507        } else {
508            // If not, then it definitely wasn't a prefix match
509            false
510        };
511
512        Ok(FtsMatchInfo {
513            prefix,
514            stemming: fts_query.match_required_stemming(&fts_content),
515        })
516    }
517
518    fn fetch_categories_for_suggestion(&self, suggestion_id: i64) -> Result<Vec<i32>> {
519        let mut category_stmt = self.conn.prepare(
520            r#"
521            SELECT category FROM serp_categories WHERE suggestion_id = :suggestion_id
522            "#,
523        )?;
524
525        let categories: Option<Vec<i32>> = category_stmt
526            .query_map(
527                named_params! {
528                    ":suggestion_id": suggestion_id
529                },
530                |row| {
531                    let category: Option<i32> = row.get(0)?;
532                    Ok(category)
533                },
534            )?
535            .collect::<std::result::Result<Vec<_>, _>>()?
536            .into_iter()
537            .collect();
538
539        Ok(categories.unwrap_or_default())
540    }
541
542    /// Fetches Suggestions of type Wikipedia provider that match the given query
543    pub fn fetch_wikipedia_suggestions(&self, query: &SuggestionQuery) -> Result<Vec<Suggestion>> {
544        let keyword_lowercased = &query.keyword.to_lowercase();
545        let suggestions = self.conn.query_rows_and_then_cached(
546            r#"
547            SELECT
548              s.id,
549              k.rank,
550              s.title,
551              s.url
552            FROM
553              suggestions s
554            JOIN
555              keywords k
556              ON k.suggestion_id = s.id
557            WHERE
558              s.provider = :provider
559              AND k.keyword = :keyword
560              AND NOT EXISTS (SELECT 1 FROM dismissed_suggestions WHERE url=s.url)
561            "#,
562            named_params! {
563                ":keyword": keyword_lowercased,
564                ":provider": SuggestionProvider::Wikipedia
565            },
566            |row| -> Result<Suggestion> {
567                let suggestion_id: i64 = row.get("id")?;
568                let title = row.get("title")?;
569                let raw_url = row.get::<_, String>("url")?;
570
571                let keywords: Vec<String> = self.conn.query_rows_and_then_cached(
572                    "SELECT keyword FROM keywords
573                     WHERE suggestion_id = :suggestion_id AND rank >= :rank
574                     ORDER BY rank ASC",
575                    named_params! {
576                        ":suggestion_id": suggestion_id,
577                        ":rank": row.get::<_, i64>("rank")?,
578                    },
579                    |row| row.get(0),
580                )?;
581                let (icon, icon_mimetype) = self
582                    .conn
583                    .try_query_row(
584                        "SELECT i.data, i.mimetype
585                     FROM icons i
586                     JOIN wikipedia_custom_details s ON s.icon_id = i.id
587                     WHERE s.suggestion_id = :suggestion_id
588                     LIMIT 1",
589                        named_params! {
590                            ":suggestion_id": suggestion_id
591                        },
592                        |row| -> Result<_> {
593                            Ok((
594                                row.get::<_, Option<Vec<u8>>>(0)?,
595                                row.get::<_, Option<String>>(1)?,
596                            ))
597                        },
598                        true,
599                    )?
600                    .unwrap_or((None, None));
601
602                Ok(Suggestion::Wikipedia {
603                    title,
604                    url: raw_url,
605                    full_keyword: full_keyword(keyword_lowercased, &keywords),
606                    icon,
607                    icon_mimetype,
608                })
609            },
610        )?;
611        Ok(suggestions)
612    }
613
614    /// Query for suggestions using the keyword prefix and provider
615    fn map_prefix_keywords<T>(
616        &self,
617        query: &SuggestionQuery,
618        provider: &SuggestionProvider,
619        mut mapper: impl FnMut(&rusqlite::Row, &str) -> Result<T>,
620    ) -> Result<Vec<T>> {
621        let keyword_lowercased = &query.keyword.to_lowercase();
622        let (keyword_prefix, keyword_suffix) = split_keyword(keyword_lowercased);
623        let suggestions_limit = query.limit.unwrap_or(-1);
624        self.conn.query_rows_and_then_cached(
625            r#"
626                SELECT
627                  s.id,
628                  MAX(k.rank) AS rank,
629                  s.title,
630                  s.url,
631                  s.provider,
632                  s.score,
633                  k.keyword_suffix
634                FROM
635                  suggestions s
636                JOIN
637                  prefix_keywords k
638                  ON k.suggestion_id = s.id
639                WHERE
640                  k.keyword_prefix = :keyword_prefix
641                  AND (k.keyword_suffix BETWEEN :keyword_suffix AND :keyword_suffix || x'FFFF')
642                  AND s.provider = :provider
643                  AND NOT EXISTS (SELECT 1 FROM dismissed_suggestions WHERE url=s.url)
644                GROUP BY
645                  s.id
646                ORDER BY
647                  s.score DESC,
648                  rank DESC
649                LIMIT
650                  :suggestions_limit
651                "#,
652            &[
653                (":keyword_prefix", &keyword_prefix as &dyn ToSql),
654                (":keyword_suffix", &keyword_suffix as &dyn ToSql),
655                (":provider", provider as &dyn ToSql),
656                (":suggestions_limit", &suggestions_limit as &dyn ToSql),
657            ],
658            |row| mapper(row, keyword_suffix),
659        )
660    }
661
662    /// Fetches Suggestions of type Amo provider that match the given query
663    pub fn fetch_amo_suggestions(&self, query: &SuggestionQuery) -> Result<Vec<Suggestion>> {
664        let suggestions = self
665            .map_prefix_keywords(
666                query,
667                &SuggestionProvider::Amo,
668                |row, keyword_suffix| -> Result<Option<Suggestion>> {
669                    let suggestion_id: i64 = row.get("id")?;
670                    let title = row.get("title")?;
671                    let raw_url = row.get::<_, String>("url")?;
672                    let score = row.get::<_, f64>("score")?;
673
674                    let full_suffix = row.get::<_, String>("keyword_suffix")?;
675                    full_suffix
676                        .starts_with(keyword_suffix)
677                        .then(|| {
678                            self.conn.query_row_and_then(
679                                r#"
680                                SELECT
681                                  amo.description,
682                                  amo.guid,
683                                  amo.rating,
684                                  amo.icon_url,
685                                  amo.number_of_ratings
686                                FROM
687                                  amo_custom_details amo
688                                WHERE
689                                  amo.suggestion_id = :suggestion_id
690                                "#,
691                                named_params! {
692                                    ":suggestion_id": suggestion_id
693                                },
694                                |row| {
695                                    Ok(Suggestion::Amo {
696                                        title,
697                                        url: raw_url,
698                                        icon_url: row.get("icon_url")?,
699                                        description: row.get("description")?,
700                                        rating: row.get("rating")?,
701                                        number_of_ratings: row.get("number_of_ratings")?,
702                                        guid: row.get("guid")?,
703                                        score,
704                                    })
705                                },
706                            )
707                        })
708                        .transpose()
709                },
710            )?
711            .into_iter()
712            .flatten()
713            .collect();
714        Ok(suggestions)
715    }
716
717    /// Fetches suggestions for MDN
718    pub fn fetch_mdn_suggestions(&self, query: &SuggestionQuery) -> Result<Vec<Suggestion>> {
719        let suggestions = self
720            .map_prefix_keywords(
721                query,
722                &SuggestionProvider::Mdn,
723                |row, keyword_suffix| -> Result<Option<Suggestion>> {
724                    let suggestion_id: i64 = row.get("id")?;
725                    let title = row.get("title")?;
726                    let raw_url = row.get::<_, String>("url")?;
727                    let score = row.get::<_, f64>("score")?;
728
729                    let full_suffix = row.get::<_, String>("keyword_suffix")?;
730                    full_suffix
731                        .starts_with(keyword_suffix)
732                        .then(|| {
733                            self.conn.query_row_and_then(
734                                r#"
735                                SELECT
736                                    description
737                                FROM
738                                    mdn_custom_details
739                                WHERE
740                                    suggestion_id = :suggestion_id
741                                "#,
742                                named_params! {
743                                    ":suggestion_id": suggestion_id
744                                },
745                                |row| {
746                                    Ok(Suggestion::Mdn {
747                                        title,
748                                        url: raw_url,
749                                        description: row.get("description")?,
750                                        score,
751                                    })
752                                },
753                            )
754                        })
755                        .transpose()
756                },
757            )?
758            .into_iter()
759            .flatten()
760            .collect();
761
762        Ok(suggestions)
763    }
764
765    /// Fetches Fakespot suggestions
766    pub fn fetch_fakespot_suggestions(&self, query: &SuggestionQuery) -> Result<Vec<Suggestion>> {
767        let fts_query = query.fts_query();
768        let sql = r#"
769            SELECT
770                s.id,
771                s.title,
772                s.url,
773                s.score,
774                f.fakespot_grade,
775                f.product_id,
776                f.rating,
777                f.total_reviews,
778                i.data,
779                i.mimetype,
780                f.keywords,
781                f.product_type
782            FROM
783                suggestions s
784            JOIN
785                fakespot_fts fts
786                ON fts.rowid = s.id
787            JOIN
788                fakespot_custom_details f
789                ON f.suggestion_id = s.id
790            LEFT JOIN
791                icons i
792                ON i.id = f.icon_id
793            WHERE
794                fakespot_fts MATCH ?
795            ORDER BY
796                s.score DESC
797            "#
798        .to_string();
799
800        // Store the list of results plus the suggestion id for calculating the FTS match info
801        let mut results =
802            self.conn
803                .query_rows_and_then_cached(&sql, (&fts_query.match_arg,), |row| {
804                    let id: usize = row.get(0)?;
805                    let score = fakespot::FakespotScore::new(
806                        &query.keyword,
807                        row.get(10)?,
808                        row.get(11)?,
809                        row.get(3)?,
810                    )
811                    .as_suggest_score();
812                    Result::Ok((
813                        Suggestion::Fakespot {
814                            title: row.get(1)?,
815                            url: row.get(2)?,
816                            score,
817                            fakespot_grade: row.get(4)?,
818                            product_id: row.get(5)?,
819                            rating: row.get(6)?,
820                            total_reviews: row.get(7)?,
821                            icon: row.get(8)?,
822                            icon_mimetype: row.get(9)?,
823                            match_info: None,
824                        },
825                        id,
826                    ))
827                })?;
828        // Sort the results, then add the FTS match info to the first one
829        // For performance reasons, this is only calculated for the result with the highest score.
830        // We assume that only one that will be shown to the user and therefore the only one we'll
831        // collect metrics for.
832        results.sort();
833        if let Some((suggestion, id)) = results.first_mut() {
834            match suggestion {
835                Suggestion::Fakespot {
836                    match_info, title, ..
837                } => {
838                    *match_info = Some(self.fetch_fakespot_fts_match_info(&fts_query, *id, title)?);
839                }
840                _ => unreachable!(),
841            }
842        }
843        Ok(results
844            .into_iter()
845            .map(|(suggestion, _)| suggestion)
846            .collect())
847    }
848
849    fn fetch_fakespot_fts_match_info(
850        &self,
851        fts_query: &FtsQuery<'_>,
852        suggestion_id: usize,
853        title: &str,
854    ) -> Result<FtsMatchInfo> {
855        let prefix = if fts_query.is_prefix_query {
856            // If the query was a prefix match query then test if the query without the prefix
857            // match would have also matched.  If not, then this counts as a prefix match.
858            let sql = "SELECT 1 FROM fakespot_fts WHERE rowid = ? AND fakespot_fts MATCH ?";
859            let params = (&suggestion_id, &fts_query.match_arg_without_prefix_match);
860            !self.conn.exists(sql, params)?
861        } else {
862            // If not, then it definitely wasn't a prefix match
863            false
864        };
865
866        Ok(FtsMatchInfo {
867            prefix,
868            stemming: fts_query.match_required_stemming(title),
869        })
870    }
871
872    /// Fetches dynamic suggestions
873    pub fn fetch_dynamic_suggestions(&self, query: &SuggestionQuery) -> Result<Vec<Suggestion>> {
874        let Some(suggestion_types) = query
875            .provider_constraints
876            .as_ref()
877            .and_then(|c| c.dynamic_suggestion_types.as_ref())
878        else {
879            return Ok(vec![]);
880        };
881
882        let keyword = query.keyword.to_lowercase();
883        let params = rusqlite::params_from_iter(
884            std::iter::once(&SuggestionProvider::Dynamic as &dyn ToSql)
885                .chain(std::iter::once(&keyword as &dyn ToSql))
886                .chain(suggestion_types.iter().map(|t| t as &dyn ToSql)),
887        );
888        self.conn.query_rows_and_then_cached(
889            &format!(
890                r#"
891                SELECT
892                  s.url,
893                  s.score,
894                  d.suggestion_type,
895                  d.json_data
896                FROM
897                  suggestions s
898                JOIN
899                  dynamic_custom_details d
900                  ON d.suggestion_id = s.id
901                JOIN
902                  keywords k
903                  ON k.suggestion_id = s.id
904                WHERE
905                  s.provider = ?
906                  AND k.keyword = ?
907                  AND d.suggestion_type IN ({})
908                  AND NOT EXISTS (
909                    SELECT 1 FROM dismissed_dynamic_suggestions
910                    WHERE dismissal_key = s.url AND suggestion_type = d.suggestion_type
911                  )
912                ORDER BY
913                  s.score ASC, d.suggestion_type ASC, s.id ASC
914                "#,
915                repeat_sql_vars(suggestion_types.len())
916            ),
917            params,
918            |row| -> Result<Suggestion> {
919                let dismissal_key: String = row.get("url")?;
920                let json_data: Option<String> = row.get("json_data")?;
921                Ok(Suggestion::Dynamic {
922                    suggestion_type: row.get("suggestion_type")?,
923                    data: match json_data {
924                        None => None,
925                        Some(j) => serde_json::from_str(&j)?,
926                    },
927                    score: row.get("score")?,
928                    dismissal_key: (!dismissal_key.is_empty()).then_some(dismissal_key),
929                })
930            },
931        )
932    }
933
934    pub fn are_suggestions_ingested_for_record(&self, record_id: &SuggestRecordId) -> Result<bool> {
935        Ok(self.conn.exists(
936            r#"
937            SELECT
938              id
939            FROM
940              suggestions
941            WHERE
942              record_id = :record_id
943            "#,
944            named_params! {
945                ":record_id": record_id.as_str(),
946            },
947        )?)
948    }
949
950    pub fn is_amp_fts_data_ingested(&self, record_id: &SuggestRecordId) -> Result<bool> {
951        Ok(self.conn.exists(
952            r#"
953            SELECT 1
954            FROM suggestions s
955            JOIN amp_fts fts
956              ON fts.rowid = s.id
957            WHERE s.record_id = :record_id
958            "#,
959            named_params! {
960                ":record_id": record_id.as_str(),
961            },
962        )?)
963    }
964
965    /// Inserts all suggestions from a downloaded AMO attachment into
966    /// the database.
967    pub fn insert_amo_suggestions(
968        &mut self,
969        record_id: &SuggestRecordId,
970        suggestions: &[DownloadedAmoSuggestion],
971    ) -> Result<()> {
972        let mut suggestion_insert = SuggestionInsertStatement::new(self.conn)?;
973        let mut amo_insert = AmoInsertStatement::new(self.conn)?;
974        let mut prefix_keyword_insert = PrefixKeywordInsertStatement::new(self.conn)?;
975        for suggestion in suggestions {
976            self.scope.err_if_interrupted()?;
977            let suggestion_id = suggestion_insert.execute(
978                record_id,
979                &suggestion.title,
980                &suggestion.url,
981                suggestion.score,
982                SuggestionProvider::Amo,
983            )?;
984            amo_insert.execute(suggestion_id, suggestion)?;
985            for (index, keyword) in suggestion.keywords.iter().enumerate() {
986                let (keyword_prefix, keyword_suffix) = split_keyword(keyword);
987                prefix_keyword_insert.execute(
988                    suggestion_id,
989                    None,
990                    keyword_prefix,
991                    keyword_suffix,
992                    index,
993                )?;
994            }
995        }
996        Ok(())
997    }
998
999    /// Inserts suggestions from an AMP attachment into the database.
1000    pub fn insert_amp_suggestions(
1001        &mut self,
1002        record_id: &SuggestRecordId,
1003        suggestions: &[DownloadedAmpSuggestion],
1004        enable_fts: bool,
1005    ) -> Result<()> {
1006        // Prepare statements outside of the loop.  This results in a large performance
1007        // improvement on a fresh ingest, since there are so many rows.
1008        let mut suggestion_insert = SuggestionInsertStatement::new(self.conn)?;
1009        let mut amp_insert = AmpInsertStatement::new(self.conn)?;
1010        let mut keyword_insert = KeywordInsertStatement::new(self.conn)?;
1011        let mut fts_insert = AmpFtsInsertStatement::new(self.conn)?;
1012        let mut category_insert = CategoryInsertStatement::new(self.conn)?;
1013        for suggestion in suggestions {
1014            self.scope.err_if_interrupted()?;
1015            let suggestion_id = suggestion_insert.execute(
1016                record_id,
1017                &suggestion.title,
1018                &suggestion.url,
1019                suggestion.score.unwrap_or(DEFAULT_SUGGESTION_SCORE),
1020                SuggestionProvider::Amp,
1021            )?;
1022            amp_insert.execute(suggestion_id, suggestion)?;
1023            if enable_fts {
1024                fts_insert.execute(
1025                    suggestion_id,
1026                    &suggestion.full_keywords_fts_column(),
1027                    &suggestion.title,
1028                )?;
1029            }
1030            let mut full_keyword_inserter = FullKeywordInserter::new(self.conn, suggestion_id);
1031            for keyword in suggestion.keywords() {
1032                let full_keyword_id = if let Some(full_keyword) = keyword.full_keyword {
1033                    Some(full_keyword_inserter.maybe_insert(full_keyword)?)
1034                } else {
1035                    None
1036                };
1037                keyword_insert.execute(
1038                    suggestion_id,
1039                    keyword.keyword,
1040                    full_keyword_id,
1041                    keyword.rank,
1042                )?;
1043            }
1044
1045            if let Some(categories) = &suggestion.serp_categories {
1046                for category in categories {
1047                    category_insert.execute(suggestion_id, *category)?;
1048                }
1049            }
1050        }
1051        Ok(())
1052    }
1053
1054    /// Inserts suggestions from a Wikipedia attachment into the database.
1055    pub fn insert_wikipedia_suggestions(
1056        &mut self,
1057        record_id: &SuggestRecordId,
1058        suggestions: &[DownloadedWikipediaSuggestion],
1059    ) -> Result<()> {
1060        // Prepare statements outside of the loop.  This results in a large performance
1061        // improvement on a fresh ingest, since there are so many rows.
1062        let mut suggestion_insert = SuggestionInsertStatement::new(self.conn)?;
1063        let mut wiki_insert = WikipediaInsertStatement::new(self.conn)?;
1064        let mut keyword_insert = KeywordInsertStatement::new(self.conn)?;
1065        for suggestion in suggestions {
1066            self.scope.err_if_interrupted()?;
1067            let suggestion_id = suggestion_insert.execute(
1068                record_id,
1069                &suggestion.title,
1070                &suggestion.url,
1071                suggestion.score.unwrap_or(DEFAULT_SUGGESTION_SCORE),
1072                SuggestionProvider::Wikipedia,
1073            )?;
1074            wiki_insert.execute(suggestion_id, suggestion)?;
1075            for keyword in suggestion.keywords() {
1076                // Don't update `full_keywords`, see bug 1876217.
1077                keyword_insert.execute(suggestion_id, keyword.keyword, None, keyword.rank)?;
1078            }
1079        }
1080        Ok(())
1081    }
1082
1083    /// Inserts all suggestions from a downloaded MDN attachment into
1084    /// the database.
1085    pub fn insert_mdn_suggestions(
1086        &mut self,
1087        record_id: &SuggestRecordId,
1088        suggestions: &[DownloadedMdnSuggestion],
1089    ) -> Result<()> {
1090        let mut suggestion_insert = SuggestionInsertStatement::new(self.conn)?;
1091        let mut mdn_insert = MdnInsertStatement::new(self.conn)?;
1092        let mut prefix_keyword_insert = PrefixKeywordInsertStatement::new(self.conn)?;
1093        for suggestion in suggestions {
1094            self.scope.err_if_interrupted()?;
1095            let suggestion_id = suggestion_insert.execute(
1096                record_id,
1097                &suggestion.title,
1098                &suggestion.url,
1099                suggestion.score,
1100                SuggestionProvider::Mdn,
1101            )?;
1102            mdn_insert.execute(suggestion_id, suggestion)?;
1103            for (index, keyword) in suggestion.keywords.iter().enumerate() {
1104                let (keyword_prefix, keyword_suffix) = split_keyword(keyword);
1105                prefix_keyword_insert.execute(
1106                    suggestion_id,
1107                    None,
1108                    keyword_prefix,
1109                    keyword_suffix,
1110                    index,
1111                )?;
1112            }
1113        }
1114        Ok(())
1115    }
1116
1117    /// Inserts all suggestions from a downloaded Fakespot attachment into the database.
1118    pub fn insert_fakespot_suggestions(
1119        &mut self,
1120        record_id: &SuggestRecordId,
1121        suggestions: &[DownloadedFakespotSuggestion],
1122    ) -> Result<()> {
1123        let mut suggestion_insert = SuggestionInsertStatement::new(self.conn)?;
1124        let mut fakespot_insert = FakespotInsertStatement::new(self.conn)?;
1125        for suggestion in suggestions {
1126            let suggestion_id = suggestion_insert.execute(
1127                record_id,
1128                &suggestion.title,
1129                &suggestion.url,
1130                suggestion.score,
1131                SuggestionProvider::Fakespot,
1132            )?;
1133            fakespot_insert.execute(suggestion_id, suggestion)?;
1134        }
1135        Ok(())
1136    }
1137
1138    /// Inserts dynamic suggestion records data into the database.
1139    pub fn insert_dynamic_suggestions(
1140        &mut self,
1141        record_id: &SuggestRecordId,
1142        record: &DownloadedDynamicRecord,
1143        suggestions: &[DownloadedDynamicSuggestion],
1144    ) -> Result<()> {
1145        // `suggestion.keywords()` can yield duplicates for dynamic
1146        // suggestions, so ignore failures on insert in the uniqueness
1147        // constraint on `(suggestion_id, keyword)`.
1148        let mut keyword_insert = KeywordInsertStatement::with_details(
1149            self.conn,
1150            "keywords",
1151            Some(InsertConflictResolution::Ignore),
1152        )?;
1153        let mut suggestion_insert = SuggestionInsertStatement::new(self.conn)?;
1154        let mut dynamic_insert = DynamicInsertStatement::new(self.conn)?;
1155        for suggestion in suggestions {
1156            self.scope.err_if_interrupted()?;
1157            let suggestion_id = suggestion_insert.execute(
1158                record_id,
1159                // title - Not used by dynamic suggestions.
1160                "",
1161                // url - Dynamic suggestions store their dismissal key here
1162                // instead.
1163                suggestion.dismissal_key.as_deref().unwrap_or(""),
1164                record.score.unwrap_or(DEFAULT_SUGGESTION_SCORE),
1165                SuggestionProvider::Dynamic,
1166            )?;
1167            dynamic_insert.execute(suggestion_id, &record.suggestion_type, suggestion)?;
1168
1169            // Dynamic suggestions don't use `rank` but `(suggestion_id, rank)`
1170            // must be unique since there's an index on that tuple.
1171            for (rank, keyword) in suggestion.keywords().enumerate() {
1172                keyword_insert.execute(suggestion_id, &keyword, None, rank)?;
1173            }
1174        }
1175        Ok(())
1176    }
1177
1178    /// Inserts or replaces an icon for a suggestion into the database.
1179    pub fn put_icon(&mut self, icon_id: &str, data: &[u8], mimetype: &str) -> Result<()> {
1180        self.conn.execute(
1181            "INSERT OR REPLACE INTO icons(
1182                 id,
1183                 data,
1184                 mimetype
1185             )
1186             VALUES(
1187                 :id,
1188                 :data,
1189                 :mimetype
1190             )",
1191            named_params! {
1192                ":id": icon_id,
1193                ":data": data,
1194                ":mimetype": mimetype,
1195            },
1196        )?;
1197        Ok(())
1198    }
1199
1200    pub fn insert_dismissal(&self, key: &str) -> Result<()> {
1201        self.conn.execute(
1202            "INSERT OR IGNORE INTO dismissed_suggestions(url)
1203             VALUES(:url)",
1204            named_params! {
1205                ":url": key,
1206            },
1207        )?;
1208        Ok(())
1209    }
1210
1211    pub fn insert_dynamic_dismissal(&self, suggestion_type: &str, key: &str) -> Result<()> {
1212        self.conn.execute(
1213            "INSERT OR IGNORE INTO dismissed_dynamic_suggestions(suggestion_type, dismissal_key)
1214             VALUES(:suggestion_type, :dismissal_key)",
1215            named_params! {
1216                ":suggestion_type": suggestion_type,
1217                ":dismissal_key": key,
1218            },
1219        )?;
1220        Ok(())
1221    }
1222
1223    pub fn clear_dismissals(&self) -> Result<()> {
1224        self.conn.execute_batch(
1225            "DELETE FROM dismissed_suggestions;
1226             DELETE FROM dismissed_dynamic_suggestions;",
1227        )?;
1228        Ok(())
1229    }
1230
1231    pub fn has_dismissal(&self, key: &str) -> Result<bool> {
1232        Ok(self.conn.exists(
1233            "SELECT 1 FROM dismissed_suggestions WHERE url = :url",
1234            named_params! {
1235                ":url": key,
1236            },
1237        )?)
1238    }
1239
1240    pub fn has_dynamic_dismissal(&self, suggestion_type: &str, key: &str) -> Result<bool> {
1241        Ok(self.conn.exists(
1242            "SELECT 1
1243             FROM dismissed_dynamic_suggestions
1244             WHERE suggestion_type = :suggestion_type AND dismissal_key = :dismissal_key",
1245            named_params! {
1246                ":suggestion_type": suggestion_type,
1247                ":dismissal_key": key,
1248            },
1249        )?)
1250    }
1251
1252    pub fn any_dismissals(&self) -> Result<bool> {
1253        Ok(self.conn.query_row(
1254            "SELECT
1255             EXISTS(SELECT 1 FROM dismissed_suggestions)
1256               OR
1257             EXISTS(SELECT 1 FROM dismissed_dynamic_suggestions)",
1258            (),
1259            |row| row.get(0),
1260        )?)
1261    }
1262
1263    /// Deletes all suggestions associated with a Remote Settings record from
1264    /// the database.
1265    pub fn drop_suggestions(&mut self, record_id: &SuggestRecordId) -> Result<()> {
1266        // If you update this, you probably need to update
1267        // `schema::clear_database()` too!
1268        //
1269        // Call `err_if_interrupted` before each statement since these have historically taken a
1270        // long time and caused shutdown hangs.
1271
1272        self.scope.err_if_interrupted()?;
1273        self.conn.execute_cached(
1274            "DELETE FROM keywords WHERE suggestion_id IN (SELECT id from suggestions WHERE record_id = :record_id)",
1275            named_params! { ":record_id": record_id.as_str() },
1276        )?;
1277        self.scope.err_if_interrupted()?;
1278        self.conn.execute_cached(
1279            "DELETE FROM keywords_i18n WHERE suggestion_id IN (SELECT id from suggestions WHERE record_id = :record_id)",
1280            named_params! { ":record_id": record_id.as_str() },
1281        )?;
1282        self.scope.err_if_interrupted()?;
1283        self.conn.execute_cached(
1284            "DELETE FROM full_keywords WHERE suggestion_id IN (SELECT id from suggestions WHERE record_id = :record_id)",
1285            named_params! { ":record_id": record_id.as_str() },
1286        )?;
1287        self.scope.err_if_interrupted()?;
1288        self.conn.execute_cached(
1289            "DELETE FROM prefix_keywords WHERE suggestion_id IN (SELECT id from suggestions WHERE record_id = :record_id)",
1290            named_params! { ":record_id": record_id.as_str() },
1291        )?;
1292        self.scope.err_if_interrupted()?;
1293        self.conn.execute_cached(
1294            "DELETE FROM keywords_metrics WHERE record_id = :record_id",
1295            named_params! { ":record_id": record_id.as_str() },
1296        )?;
1297        self.scope.err_if_interrupted()?;
1298        self.conn.execute_cached(
1299            "
1300            DELETE FROM fakespot_fts
1301            WHERE rowid IN (SELECT id from suggestions WHERE record_id = :record_id)
1302            ",
1303            named_params! { ":record_id": record_id.as_str() },
1304        )?;
1305        self.scope.err_if_interrupted()?;
1306        self.conn.execute_cached(
1307            "DELETE FROM suggestions WHERE record_id = :record_id",
1308            named_params! { ":record_id": record_id.as_str() },
1309        )?;
1310        self.scope.err_if_interrupted()?;
1311        self.conn.execute_cached(
1312            "DELETE FROM yelp_subjects WHERE record_id = :record_id",
1313            named_params! { ":record_id": record_id.as_str() },
1314        )?;
1315        self.scope.err_if_interrupted()?;
1316        self.conn.execute_cached(
1317            "DELETE FROM yelp_modifiers WHERE record_id = :record_id",
1318            named_params! { ":record_id": record_id.as_str() },
1319        )?;
1320        self.scope.err_if_interrupted()?;
1321        self.conn.execute_cached(
1322            "DELETE FROM yelp_custom_details WHERE record_id = :record_id",
1323            named_params! { ":record_id": record_id.as_str() },
1324        )?;
1325        self.scope.err_if_interrupted()?;
1326        self.conn.execute_cached(
1327            "DELETE FROM geonames WHERE record_id = :record_id",
1328            named_params! { ":record_id": record_id.as_str() },
1329        )?;
1330        self.scope.err_if_interrupted()?;
1331        self.conn.execute_cached(
1332            "DELETE FROM geonames_alternates WHERE record_id = :record_id",
1333            named_params! { ":record_id": record_id.as_str() },
1334        )?;
1335        self.scope.err_if_interrupted()?;
1336        self.conn.execute_cached(
1337            "DELETE FROM geonames_metrics WHERE record_id = :record_id",
1338            named_params! { ":record_id": record_id.as_str() },
1339        )?;
1340        self.scope.err_if_interrupted()?;
1341        self.conn.execute_cached(
1342            "DELETE FROM serp_categories WHERE suggestion_id IN (SELECT id from suggestions WHERE record_id = :record_id)",
1343            named_params! { ":record_id": record_id.as_str() },
1344        )?;
1345
1346        // Invalidate these caches since we might have deleted a record their
1347        // contents are based on.
1348        self.weather_cache.take();
1349        self.geoname_cache.take();
1350
1351        Ok(())
1352    }
1353
1354    /// Deletes an icon for a suggestion from the database.
1355    pub fn drop_icon(&mut self, icon_id: &str) -> Result<()> {
1356        self.conn.execute_cached(
1357            "DELETE FROM icons WHERE id = :id",
1358            named_params! { ":id": icon_id },
1359        )?;
1360        Ok(())
1361    }
1362
1363    /// Clears the database, removing all suggestions, icons, and metadata.
1364    pub fn clear(&mut self) -> Result<()> {
1365        Ok(clear_database(self.conn)?)
1366    }
1367
1368    /// Returns the value associated with a metadata key.
1369    pub fn get_meta<T: FromSql>(&self, key: &str) -> Result<Option<T>> {
1370        Ok(self.conn.try_query_one(
1371            "SELECT value FROM meta WHERE key = :key",
1372            named_params! { ":key": key },
1373            true,
1374        )?)
1375    }
1376
1377    /// Sets the value for a metadata key.
1378    pub fn put_meta(&mut self, key: &str, value: impl ToSql) -> Result<()> {
1379        self.conn.execute_cached(
1380            "INSERT OR REPLACE INTO meta(key, value) VALUES(:key, :value)",
1381            named_params! { ":key": key, ":value": value },
1382        )?;
1383        Ok(())
1384    }
1385
1386    /// Stores global Suggest configuration data.
1387    pub fn put_global_config(&mut self, config: &SuggestGlobalConfig) -> Result<()> {
1388        self.put_meta(GLOBAL_CONFIG_META_KEY, serde_json::to_string(config)?)
1389    }
1390
1391    /// Gets the stored global Suggest configuration data or a default config if
1392    /// none is stored.
1393    pub fn get_global_config(&self) -> Result<SuggestGlobalConfig> {
1394        self.get_meta::<String>(GLOBAL_CONFIG_META_KEY)?
1395            .map_or_else(
1396                || Ok(SuggestGlobalConfig::default()),
1397                |json| Ok(serde_json::from_str(&json)?),
1398            )
1399    }
1400
1401    /// Stores configuration data for a given provider.
1402    pub fn put_provider_config(
1403        &mut self,
1404        provider: SuggestionProvider,
1405        config: &SuggestProviderConfig,
1406    ) -> Result<()> {
1407        self.put_meta(
1408            &provider_config_meta_key(provider),
1409            serde_json::to_string(config)?,
1410        )
1411    }
1412
1413    /// Gets the stored configuration data for a given provider or None if none
1414    /// is stored.
1415    pub fn get_provider_config(
1416        &self,
1417        provider: SuggestionProvider,
1418    ) -> Result<Option<SuggestProviderConfig>> {
1419        self.get_meta::<String>(&provider_config_meta_key(provider))?
1420            .map_or_else(|| Ok(None), |json| Ok(serde_json::from_str(&json)?))
1421    }
1422
1423    /// Gets keywords metrics for a record type.
1424    pub fn get_keywords_metrics(&self, record_type: SuggestRecordType) -> Result<KeywordsMetrics> {
1425        let data = self.conn.try_query_row(
1426            r#"
1427            SELECT
1428                max(max_len) AS len,
1429                max(max_word_count) AS word_count
1430            FROM
1431                keywords_metrics
1432            WHERE
1433                record_type = :record_type
1434            "#,
1435            named_params! {
1436                ":record_type": record_type,
1437            },
1438            |row| -> Result<(usize, usize)> { Ok((row.get("len")?, row.get("word_count")?)) },
1439            true, // cache
1440        )?;
1441        Ok(data
1442            .map(|(max_len, max_word_count)| KeywordsMetrics {
1443                max_len,
1444                max_word_count,
1445            })
1446            .unwrap_or_default())
1447    }
1448}
1449
1450#[derive(Debug, PartialEq, Eq, Hash)]
1451pub struct IngestedRecord {
1452    pub id: SuggestRecordId,
1453    pub collection: String,
1454    pub record_type: String,
1455    pub last_modified: u64,
1456}
1457
1458impl IngestedRecord {
1459    fn from_row(row: &rusqlite::Row) -> Result<Self> {
1460        Ok(Self {
1461            id: SuggestRecordId::new(row.get("id")?),
1462            collection: row.get("collection")?,
1463            record_type: row.get("type")?,
1464            last_modified: row.get("last_modified")?,
1465        })
1466    }
1467}
1468
1469/// Helper struct to get full_keyword_ids for a suggestion
1470///
1471/// `FullKeywordInserter` handles repeated full keywords efficiently.  The first instance will
1472/// cause a row to be inserted into the database.  Subsequent instances will return the same
1473/// full_keyword_id.
1474struct FullKeywordInserter<'a> {
1475    conn: &'a Connection,
1476    suggestion_id: i64,
1477    last_inserted: Option<(&'a str, i64)>,
1478}
1479
1480impl<'a> FullKeywordInserter<'a> {
1481    fn new(conn: &'a Connection, suggestion_id: i64) -> Self {
1482        Self {
1483            conn,
1484            suggestion_id,
1485            last_inserted: None,
1486        }
1487    }
1488
1489    fn maybe_insert(&mut self, full_keyword: &'a str) -> rusqlite::Result<i64> {
1490        match self.last_inserted {
1491            Some((s, id)) if s == full_keyword => Ok(id),
1492            _ => {
1493                let full_keyword_id = self.conn.query_row_and_then(
1494                    "INSERT INTO full_keywords(
1495                        suggestion_id,
1496                        full_keyword
1497                     )
1498                     VALUES(
1499                        :suggestion_id,
1500                        :keyword
1501                     )
1502                     RETURNING id",
1503                    named_params! {
1504                        ":keyword": full_keyword,
1505                        ":suggestion_id": self.suggestion_id,
1506                    },
1507                    |row| row.get(0),
1508                )?;
1509                self.last_inserted = Some((full_keyword, full_keyword_id));
1510                Ok(full_keyword_id)
1511            }
1512        }
1513    }
1514}
1515
1516// ======================== Statement types ========================
1517//
1518// During ingestion we can insert hundreds of thousands of rows.  These types enable speedups by
1519// allowing us to prepare a statement outside a loop and use it many times inside the loop.
1520//
1521// Each type wraps [Connection::prepare] and [Statement] to provide a simplified interface,
1522// tailored to a specific query.
1523//
1524// This pattern is applicable for whenever we execute the same query repeatedly in a loop.
1525// The impact scales with the number of loop iterations, which is why we currently don't do this
1526// for providers like Mdn and Weather, which have relatively small number of records
1527// compared to Amp/Wikipedia.
1528
1529pub(crate) struct SuggestionInsertStatement<'conn>(rusqlite::Statement<'conn>);
1530
1531impl<'conn> SuggestionInsertStatement<'conn> {
1532    pub(crate) fn new(conn: &'conn Connection) -> Result<Self> {
1533        Ok(Self(conn.prepare(
1534            "INSERT INTO suggestions(
1535                 record_id,
1536                 title,
1537                 url,
1538                 score,
1539                 provider
1540             )
1541             VALUES(?, ?, ?, ?, ?)
1542             RETURNING id",
1543        )?))
1544    }
1545
1546    /// Execute the insert and return the `suggestion_id` for the new row
1547    pub(crate) fn execute(
1548        &mut self,
1549        record_id: &SuggestRecordId,
1550        title: &str,
1551        url: &str,
1552        score: f64,
1553        provider: SuggestionProvider,
1554    ) -> Result<i64> {
1555        self.0
1556            .query_row(
1557                (record_id.as_str(), title, url, score, provider as u8),
1558                |row| row.get(0),
1559            )
1560            .with_context("suggestion insert")
1561    }
1562}
1563
1564struct AmpInsertStatement<'conn>(rusqlite::Statement<'conn>);
1565
1566impl<'conn> AmpInsertStatement<'conn> {
1567    fn new(conn: &'conn Connection) -> Result<Self> {
1568        Ok(Self(conn.prepare(
1569            "INSERT INTO amp_custom_details(
1570                 suggestion_id,
1571                 advertiser,
1572                 block_id,
1573                 iab_category,
1574                 impression_url,
1575                 click_url,
1576                 icon_id
1577             )
1578             VALUES(?, ?, ?, ?, ?, ?, ?)
1579             ",
1580        )?))
1581    }
1582
1583    fn execute(&mut self, suggestion_id: i64, amp: &DownloadedAmpSuggestion) -> Result<()> {
1584        self.0
1585            .execute((
1586                suggestion_id,
1587                &amp.advertiser,
1588                amp.block_id,
1589                &amp.iab_category,
1590                &amp.impression_url,
1591                &amp.click_url,
1592                &amp.icon_id,
1593            ))
1594            .with_context("amp insert")?;
1595        Ok(())
1596    }
1597}
1598
1599struct WikipediaInsertStatement<'conn>(rusqlite::Statement<'conn>);
1600
1601impl<'conn> WikipediaInsertStatement<'conn> {
1602    fn new(conn: &'conn Connection) -> Result<Self> {
1603        Ok(Self(conn.prepare(
1604            "INSERT INTO wikipedia_custom_details(
1605                 suggestion_id,
1606                 icon_id
1607             )
1608             VALUES(?, ?)
1609             ",
1610        )?))
1611    }
1612
1613    fn execute(
1614        &mut self,
1615        suggestion_id: i64,
1616        wikipedia: &DownloadedWikipediaSuggestion,
1617    ) -> Result<()> {
1618        self.0
1619            .execute((suggestion_id, &wikipedia.icon_id))
1620            .with_context("wikipedia insert")?;
1621        Ok(())
1622    }
1623}
1624
1625struct AmoInsertStatement<'conn>(rusqlite::Statement<'conn>);
1626
1627impl<'conn> AmoInsertStatement<'conn> {
1628    fn new(conn: &'conn Connection) -> Result<Self> {
1629        Ok(Self(conn.prepare(
1630            "INSERT INTO amo_custom_details(
1631                 suggestion_id,
1632                 description,
1633                 guid,
1634                 icon_url,
1635                 rating,
1636                 number_of_ratings
1637             )
1638             VALUES(?, ?, ?, ?, ?, ?)
1639             ",
1640        )?))
1641    }
1642
1643    fn execute(&mut self, suggestion_id: i64, amo: &DownloadedAmoSuggestion) -> Result<()> {
1644        self.0
1645            .execute((
1646                suggestion_id,
1647                &amo.description,
1648                &amo.guid,
1649                &amo.icon_url,
1650                &amo.rating,
1651                amo.number_of_ratings,
1652            ))
1653            .with_context("amo insert")?;
1654        Ok(())
1655    }
1656}
1657
1658struct MdnInsertStatement<'conn>(rusqlite::Statement<'conn>);
1659
1660impl<'conn> MdnInsertStatement<'conn> {
1661    fn new(conn: &'conn Connection) -> Result<Self> {
1662        Ok(Self(conn.prepare(
1663            "INSERT INTO mdn_custom_details(
1664                 suggestion_id,
1665                 description
1666             )
1667             VALUES(?, ?)
1668             ",
1669        )?))
1670    }
1671
1672    fn execute(&mut self, suggestion_id: i64, mdn: &DownloadedMdnSuggestion) -> Result<()> {
1673        self.0
1674            .execute((suggestion_id, &mdn.description))
1675            .with_context("mdn insert")?;
1676        Ok(())
1677    }
1678}
1679
1680struct FakespotInsertStatement<'conn>(rusqlite::Statement<'conn>);
1681
1682impl<'conn> FakespotInsertStatement<'conn> {
1683    fn new(conn: &'conn Connection) -> Result<Self> {
1684        Ok(Self(conn.prepare(
1685            "INSERT INTO fakespot_custom_details(
1686                 suggestion_id,
1687                 fakespot_grade,
1688                 product_id,
1689                 keywords,
1690                 product_type,
1691                 rating,
1692                 total_reviews,
1693                 icon_id
1694             )
1695             VALUES(?, ?, ?, ?, ?, ?, ?, ?)
1696             ",
1697        )?))
1698    }
1699
1700    fn execute(
1701        &mut self,
1702        suggestion_id: i64,
1703        fakespot: &DownloadedFakespotSuggestion,
1704    ) -> Result<()> {
1705        let icon_id = fakespot
1706            .product_id
1707            .split_once('-')
1708            .map(|(vendor, _)| format!("fakespot-{vendor}"));
1709        self.0
1710            .execute((
1711                suggestion_id,
1712                &fakespot.fakespot_grade,
1713                &fakespot.product_id,
1714                &fakespot.keywords.to_lowercase(),
1715                &fakespot.product_type.to_lowercase(),
1716                fakespot.rating,
1717                fakespot.total_reviews,
1718                icon_id,
1719            ))
1720            .with_context("fakespot insert")?;
1721        Ok(())
1722    }
1723}
1724
1725struct DynamicInsertStatement<'conn>(rusqlite::Statement<'conn>);
1726
1727impl<'conn> DynamicInsertStatement<'conn> {
1728    fn new(conn: &'conn Connection) -> Result<Self> {
1729        Ok(Self(conn.prepare(
1730            "INSERT INTO dynamic_custom_details(
1731                 suggestion_id,
1732                 suggestion_type,
1733                 json_data
1734             )
1735             VALUES(?, ?, ?)
1736             ",
1737        )?))
1738    }
1739
1740    fn execute(
1741        &mut self,
1742        suggestion_id: i64,
1743        suggestion_type: &str,
1744        suggestion: &DownloadedDynamicSuggestion,
1745    ) -> Result<()> {
1746        self.0
1747            .execute((
1748                suggestion_id,
1749                suggestion_type,
1750                match &suggestion.data {
1751                    None => None,
1752                    Some(d) => Some(serde_json::to_string(&d)?),
1753                },
1754            ))
1755            .with_context("dynamic insert")?;
1756        Ok(())
1757    }
1758}
1759
1760pub(crate) struct KeywordInsertStatement<'conn>(rusqlite::Statement<'conn>);
1761
1762impl<'conn> KeywordInsertStatement<'conn> {
1763    pub(crate) fn new(conn: &'conn Connection) -> Result<Self> {
1764        Self::with_details(conn, "keywords", None)
1765    }
1766
1767    pub(crate) fn with_details(
1768        conn: &'conn Connection,
1769        table: &str,
1770        conflict_resolution: Option<InsertConflictResolution>,
1771    ) -> Result<Self> {
1772        Ok(Self(conn.prepare(&format!(
1773            r#"
1774            INSERT {} INTO {}(
1775                suggestion_id,
1776                keyword,
1777                full_keyword_id,
1778                rank
1779            )
1780            VALUES(?, ?, ?, ?)
1781            "#,
1782            conflict_resolution.as_ref().map(|r| r.as_str()).unwrap_or_default(),
1783            table,
1784        ))?))
1785    }
1786
1787    pub(crate) fn execute(
1788        &mut self,
1789        suggestion_id: i64,
1790        keyword: &str,
1791        full_keyword_id: Option<i64>,
1792        rank: usize,
1793    ) -> Result<()> {
1794        self.0
1795            .execute((suggestion_id, keyword, full_keyword_id, rank))
1796            .with_context("keyword insert")?;
1797        Ok(())
1798    }
1799}
1800
1801pub(crate) struct CategoryInsertStatement<'conn>(rusqlite::Statement<'conn>);
1802
1803impl<'conn> CategoryInsertStatement<'conn> {
1804    pub(crate) fn new(conn: &'conn Connection) -> Result<Self> {
1805        Self::with_details(conn, "serp_categories", None)
1806    }
1807
1808    pub(crate) fn with_details(
1809        conn: &'conn Connection,
1810        table: &str,
1811        conflict_resolution: Option<InsertConflictResolution>,
1812    ) -> Result<Self> {
1813        Ok(Self(conn.prepare(&format!(
1814            r#"
1815            INSERT OR REPLACE {} INTO {}(
1816                suggestion_id,
1817                category
1818            )
1819            VALUES(?, ?)
1820            "#,
1821            conflict_resolution.as_ref().map(|r| r.as_str()).unwrap_or_default(),
1822            table,
1823        ))?))
1824    }
1825
1826    pub(crate) fn execute(&mut self, suggestion_id: i64, category: i32) -> Result<()> {
1827        self.0
1828            .execute((suggestion_id, category))
1829            .with_context("category insert")?;
1830        Ok(())
1831    }
1832}
1833
1834pub(crate) enum InsertConflictResolution {
1835    Ignore,
1836}
1837
1838impl InsertConflictResolution {
1839    fn as_str(&self) -> &str {
1840        match self {
1841            InsertConflictResolution::Ignore => "OR IGNORE",
1842        }
1843    }
1844}
1845
1846struct PrefixKeywordInsertStatement<'conn>(rusqlite::Statement<'conn>);
1847
1848impl<'conn> PrefixKeywordInsertStatement<'conn> {
1849    fn new(conn: &'conn Connection) -> Result<Self> {
1850        Ok(Self(conn.prepare(
1851            "INSERT INTO prefix_keywords(
1852                 suggestion_id,
1853                 confidence,
1854                 keyword_prefix,
1855                 keyword_suffix,
1856                 rank
1857             )
1858             VALUES(?, ?, ?, ?, ?)
1859             ",
1860        )?))
1861    }
1862
1863    fn execute(
1864        &mut self,
1865        suggestion_id: i64,
1866        confidence: Option<u8>,
1867        keyword_prefix: &str,
1868        keyword_suffix: &str,
1869        rank: usize,
1870    ) -> Result<()> {
1871        self.0
1872            .execute((
1873                suggestion_id,
1874                confidence.unwrap_or(0),
1875                keyword_prefix,
1876                keyword_suffix,
1877                rank,
1878            ))
1879            .with_context("prefix keyword insert")?;
1880        Ok(())
1881    }
1882}
1883
1884/// Metrics for a set of keywords. Use `KeywordsMetricsUpdater` to write metrics
1885/// to the store and `SuggestDao::get_keywords_metrics` to read them.
1886#[derive(Debug, Default, Eq, PartialEq)]
1887pub(crate) struct KeywordsMetrics {
1888    /// The max byte count (not chars) of all keywords in the set.
1889    pub(crate) max_len: usize,
1890    /// The max word count of all keywords in the set.
1891    pub(crate) max_word_count: usize,
1892}
1893
1894/// This can be used to keep a running tally of metrics as you process keywords
1895/// and then to write the metrics to the store. Make a `KeywordsMetricsUpdater`,
1896/// call `update` on it as you process each keyword, and then call `finish` to
1897/// write the metrics to the store.
1898pub(crate) struct KeywordsMetricsUpdater {
1899    metrics: KeywordsMetrics,
1900}
1901
1902impl KeywordsMetricsUpdater {
1903    pub(crate) fn new() -> Self {
1904        Self {
1905            metrics: KeywordsMetrics::default(),
1906        }
1907    }
1908
1909    /// Updates the metrics with those of the given keyword.
1910    ///
1911    /// The keyword's metrics are calculated as the max of its metrics as-is and
1912    /// its metrics when it's transformed using `i18n_transform`. For example,
1913    /// `"Qu\u{00e9}bec"` ("Québec" with single two-byte 'é' char) has 7 bytes,
1914    /// so its `len` is 7, but the `len` of `i18n_transform("Qu\u{00e9}bec")` is
1915    /// 6, since the 'é' is transformed to an ASCII 'e'. The keyword's `max_len`
1916    /// will therefore be the max of 7 and 6, which is 7.
1917    pub(crate) fn update(&mut self, keyword: &str) {
1918        let transformed_kw = i18n_transform(keyword);
1919        self.metrics.max_len = std::cmp::max(
1920            self.metrics.max_len,
1921            std::cmp::max(transformed_kw.len(), keyword.len()),
1922        );
1923
1924        // Getting the word count is costly, so we get only the transformed word
1925        // count under the assumption that it will always be at least as large
1926        // as the the original word count. That's currently true because the
1927        // only relevant transform that `i18n_transform` performs is to replace
1928        // hyphens with spaces.
1929        self.metrics.max_word_count = std::cmp::max(
1930            self.metrics.max_word_count,
1931            transformed_kw.split_whitespace().count(),
1932        );
1933    }
1934
1935    /// Inserts keywords metrics into the database. This assumes you have a
1936    /// cache object inside the `cache` cell that caches the metrics. It will be
1937    /// cleared since it will be invalidated by the metrics update.
1938    pub(crate) fn finish<T>(
1939        &self,
1940        conn: &Connection,
1941        record_id: &SuggestRecordId,
1942        record_type: SuggestRecordType,
1943        cache: &mut OnceCell<T>,
1944    ) -> Result<()> {
1945        let mut insert_stmt = conn.prepare(
1946            r#"
1947            INSERT OR REPLACE INTO keywords_metrics(
1948                record_id,
1949                record_type,
1950                max_len,
1951                max_word_count
1952            )
1953            VALUES(?, ?, ?, ?)
1954            "#,
1955        )?;
1956        insert_stmt
1957            .execute((
1958                record_id.as_str(),
1959                record_type,
1960                self.metrics.max_len,
1961                self.metrics.max_word_count,
1962            ))
1963            .with_context("keywords metrics insert")?;
1964
1965        // We just made some insertions that might invalidate the data in the
1966        // cache. Clear it so it's repopulated the next time it's accessed.
1967        cache.take();
1968
1969        Ok(())
1970    }
1971}
1972
1973pub(crate) struct AmpFtsInsertStatement<'conn>(rusqlite::Statement<'conn>);
1974
1975impl<'conn> AmpFtsInsertStatement<'conn> {
1976    pub(crate) fn new(conn: &'conn Connection) -> Result<Self> {
1977        Ok(Self(conn.prepare(
1978            "INSERT INTO amp_fts(rowid, full_keywords, title)
1979             VALUES(?, ?, ?)
1980             ",
1981        )?))
1982    }
1983
1984    pub(crate) fn execute(
1985        &mut self,
1986        suggestion_id: i64,
1987        full_keywords: &str,
1988        title: &str,
1989    ) -> Result<()> {
1990        self.0
1991            .execute((suggestion_id, full_keywords, title))
1992            .with_context("amp fts insert")?;
1993        Ok(())
1994    }
1995}
1996
1997fn provider_config_meta_key(provider: SuggestionProvider) -> String {
1998    format!("{}{}", PROVIDER_CONFIG_META_KEY_PREFIX, provider as u8)
1999}
2000
2001#[cfg(test)]
2002mod tests {
2003    use super::*;
2004    use crate::{store::tests::TestStore, testing::*, SuggestIngestionConstraints};
2005
2006    #[test]
2007    fn keywords_metrics_updater() -> anyhow::Result<()> {
2008        // (test keyword, expected `KeywordsMetrics`)
2009        //
2010        // Tests are cumulative!
2011        let tests = [
2012            (
2013                "abc",
2014                KeywordsMetrics {
2015                    max_len: 3,
2016                    max_word_count: 1,
2017                },
2018            ),
2019            (
2020                "a b",
2021                KeywordsMetrics {
2022                    max_len: 3,
2023                    max_word_count: 2,
2024                },
2025            ),
2026            (
2027                "a b c",
2028                KeywordsMetrics {
2029                    max_len: 5,
2030                    max_word_count: 3,
2031                },
2032            ),
2033            // "Québec" with single 'é' char:
2034            // With `i18n_transform`: len = 6
2035            // Without `i18n_transform`: len = 7
2036            // => Length for this string should be max(6, 7) = 7, which is a new
2037            //    overall max
2038            (
2039                "Qu\u{00e9}bec",
2040                KeywordsMetrics {
2041                    max_len: 7,
2042                    max_word_count: 3,
2043                },
2044            ),
2045            // "Québec" with ASCII 'e' followed by combining acute accent:
2046            // With `i18n_transform`, len = 6
2047            // Without `i18n_transform`, len = 8
2048            // => Length for this string should be max(6, 8) = 8, which is a new
2049            //    overall max
2050            (
2051                "Que\u{0301}bec",
2052                KeywordsMetrics {
2053                    max_len: 8,
2054                    max_word_count: 3,
2055                },
2056            ),
2057            // Each '-' should be replaced with a space, so the word count for
2058            // this string should be 4, which is a new overall max
2059            (
2060                "Carmel-by-the-Sea",
2061                KeywordsMetrics {
2062                    max_len: 17,
2063                    max_word_count: 4,
2064                },
2065            ),
2066        ];
2067
2068        // Make an updater and call it with each test in turn.
2069        let mut updater = KeywordsMetricsUpdater::new();
2070        for (test_kw, expected_metrics) in &tests {
2071            updater.update(test_kw);
2072            assert_eq!(&updater.metrics, expected_metrics);
2073        }
2074
2075        // Make a store and finish the updater so it updates the metrics table.
2076        let store = TestStore::new(MockRemoteSettingsClient::default());
2077        store.write(|dao| {
2078            // Make a dummy cache cell. It doesn't matter what it contains, we
2079            // just want to make sure it's empty after finishing the updater.
2080            let mut dummy_cache = OnceCell::new();
2081            dummy_cache.set("test").expect("dummy cache set");
2082            assert_ne!(dummy_cache.get(), None);
2083
2084            let record_type = SuggestRecordType::Wikipedia;
2085            updater.finish(
2086                dao.conn,
2087                &SuggestRecordId::new("test-record-1".to_string()),
2088                record_type,
2089                &mut dummy_cache,
2090            )?;
2091
2092            assert_eq!(dummy_cache.get(), None);
2093
2094            // Read back the metrics and make sure they match the ones in the
2095            // final test.
2096            let read_metrics_1 = dao.get_keywords_metrics(record_type)?;
2097            assert_eq!(read_metrics_1, tests.last().unwrap().1);
2098
2099            // Update the metrics one more time and finish them again.
2100            updater.update("a very long keyword with many words");
2101            let new_expected = KeywordsMetrics {
2102                max_len: 35,
2103                max_word_count: 7,
2104            };
2105            assert_eq!(updater.metrics, new_expected);
2106
2107            updater.finish(
2108                dao.conn,
2109                &SuggestRecordId::new("test-record-2".to_string()),
2110                record_type,
2111                &mut dummy_cache,
2112            )?;
2113
2114            // Read back the new metrics and make sure they match.
2115            let read_metrics_2 = dao.get_keywords_metrics(record_type)?;
2116            assert_eq!(read_metrics_2, new_expected);
2117
2118            Ok(())
2119        })?;
2120
2121        Ok(())
2122    }
2123
2124    // Keywords in `keywords_i18n` should be dropped when their records are
2125    // deleted.
2126    #[test]
2127    fn keywords_i18n_delete_record() -> anyhow::Result<()> {
2128        // Add some records whose keywords are stored in `keywords_i18n`. We'll
2129        // use weather records.
2130        let kws_1 = ["aaa", "bbb", "ccc"];
2131        let kws_2 = ["yyy", "zzz"];
2132        let mut store = TestStore::new(
2133            MockRemoteSettingsClient::default()
2134                .with_record(SuggestionProvider::Weather.record(
2135                    "weather-1",
2136                    json!({
2137                        "score": 0.24,
2138                        "keywords": kws_1,
2139                    }),
2140                ))
2141                .with_record(SuggestionProvider::Weather.record(
2142                    "weather-2",
2143                    json!({
2144                        "score": 0.24,
2145                        "keywords": kws_2,
2146                    }),
2147                )),
2148        );
2149        store.ingest(SuggestIngestionConstraints {
2150            providers: Some(vec![SuggestionProvider::Weather]),
2151            ..SuggestIngestionConstraints::all_providers()
2152        });
2153
2154        // Make sure all keywords are present.
2155        assert_eq!(
2156            store.count_rows("keywords_i18n") as usize,
2157            kws_1.len() + kws_2.len()
2158        );
2159
2160        for q in kws_1.iter().chain(kws_2.iter()) {
2161            assert_eq!(
2162                store.fetch_suggestions(SuggestionQuery::weather(q)),
2163                vec![Suggestion::Weather {
2164                    score: 0.24,
2165                    city: None,
2166                }],
2167                "query: {:?}",
2168                q
2169            );
2170        }
2171
2172        // Delete the first record.
2173        store
2174            .client_mut()
2175            .delete_record(SuggestionProvider::Weather.empty_record("weather-1"));
2176        store.ingest(SuggestIngestionConstraints {
2177            providers: Some(vec![SuggestionProvider::Weather]),
2178            ..SuggestIngestionConstraints::all_providers()
2179        });
2180
2181        // Its keywords should be dropped and the keywords from the second
2182        // record should still be present.
2183        assert_eq!(store.count_rows("keywords_i18n") as usize, kws_2.len());
2184
2185        for q in kws_1 {
2186            assert_eq!(
2187                store.fetch_suggestions(SuggestionQuery::weather(q)),
2188                vec![],
2189                "query: {:?}",
2190                q
2191            );
2192        }
2193        for q in kws_2 {
2194            assert_eq!(
2195                store.fetch_suggestions(SuggestionQuery::weather(q)),
2196                vec![Suggestion::Weather {
2197                    score: 0.24,
2198                    city: None,
2199                }],
2200                "query: {:?}",
2201                q
2202            );
2203        }
2204
2205        Ok(())
2206    }
2207
2208    // Keywords in `keywords_i18n` should be dropped when their records are
2209    // updated and the keywords are no longer present, and new keywords should
2210    // be added.
2211    #[test]
2212    fn keywords_i18n_update_record() -> anyhow::Result<()> {
2213        // Add some records whose keywords are stored in `keywords_i18n`. We'll
2214        // use weather records.
2215        let kws_1 = ["aaa", "bbb", "ccc"];
2216        let kws_2 = ["yyy", "zzz"];
2217        let mut store = TestStore::new(
2218            MockRemoteSettingsClient::default()
2219                .with_record(SuggestionProvider::Weather.record(
2220                    "weather-1",
2221                    json!({
2222                        "score": 0.24,
2223                        "keywords": kws_1,
2224                    }),
2225                ))
2226                .with_record(SuggestionProvider::Weather.record(
2227                    "weather-2",
2228                    json!({
2229                        "score": 0.24,
2230                        "keywords": kws_2,
2231                    }),
2232                )),
2233        );
2234        store.ingest(SuggestIngestionConstraints {
2235            providers: Some(vec![SuggestionProvider::Weather]),
2236            ..SuggestIngestionConstraints::all_providers()
2237        });
2238
2239        // Make sure all keywords are present.
2240        assert_eq!(
2241            store.count_rows("keywords_i18n") as usize,
2242            kws_1.len() + kws_2.len()
2243        );
2244
2245        for q in kws_1.iter().chain(kws_2.iter()) {
2246            assert_eq!(
2247                store.fetch_suggestions(SuggestionQuery::weather(q)),
2248                vec![Suggestion::Weather {
2249                    score: 0.24,
2250                    city: None,
2251                }],
2252                "query: {:?}",
2253                q
2254            );
2255        }
2256
2257        // Update the first record.
2258        let kws_1_new = [
2259            "bbb", // keyword from the old record
2260            "mmm", // new keyword
2261        ];
2262        store
2263            .client_mut()
2264            .update_record(SuggestionProvider::Weather.record(
2265                "weather-1",
2266                json!({
2267                    "score": 0.24,
2268                    "keywords": kws_1_new,
2269                }),
2270            ));
2271        store.ingest(SuggestIngestionConstraints {
2272            providers: Some(vec![SuggestionProvider::Weather]),
2273            ..SuggestIngestionConstraints::all_providers()
2274        });
2275
2276        // Check all keywords.
2277        assert_eq!(
2278            store.count_rows("keywords_i18n") as usize,
2279            kws_1_new.len() + kws_2.len()
2280        );
2281
2282        for q in ["aaa", "ccc"] {
2283            assert_eq!(
2284                store.fetch_suggestions(SuggestionQuery::weather(q)),
2285                vec![],
2286                "query: {:?}",
2287                q
2288            );
2289        }
2290        for q in kws_1_new.iter().chain(kws_2.iter()) {
2291            assert_eq!(
2292                store.fetch_suggestions(SuggestionQuery::weather(q)),
2293                vec![Suggestion::Weather {
2294                    score: 0.24,
2295                    city: None,
2296                }],
2297                "query: {:?}",
2298                q
2299            );
2300        }
2301
2302        Ok(())
2303    }
2304}