sql_support/
conn_ext.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use rusqlite::{
6    self,
7    types::{FromSql, ToSql},
8    Connection, Params, Result as SqlResult, Row, Savepoint, Transaction, TransactionBehavior,
9};
10use std::iter::FromIterator;
11use std::ops::Deref;
12use std::time::Instant;
13
14use crate::maybe_cached::MaybeCached;
15use crate::{debug, warn};
16
17/// This trait exists so that we can use these helpers on `rusqlite::{Transaction, Connection}`.
18/// Note that you must import ConnExt in order to call these methods on anything.
19pub trait ConnExt {
20    /// The method you need to implement to opt in to all of this.
21    fn conn(&self) -> &Connection;
22
23    /// Set the value of the pragma on the main database. Returns the same object, for chaining.
24    fn set_pragma<T>(&self, pragma_name: &str, pragma_value: T) -> SqlResult<&Self>
25    where
26        T: ToSql,
27        Self: Sized,
28    {
29        // None == Schema name, e.g. `PRAGMA some_attached_db.something = blah`
30        self.conn()
31            .pragma_update(None, pragma_name, &pragma_value)?;
32        Ok(self)
33    }
34
35    /// Get a cached or uncached statement based on a flag.
36    fn prepare_maybe_cached<'conn>(
37        &'conn self,
38        sql: &str,
39        cache: bool,
40    ) -> SqlResult<MaybeCached<'conn>> {
41        MaybeCached::prepare(self.conn(), sql, cache)
42    }
43
44    /// Execute all the provided statements.
45    fn execute_all(&self, stmts: &[&str]) -> SqlResult<()> {
46        let conn = self.conn();
47        for sql in stmts {
48            let r = conn.execute(sql, []);
49            match r {
50                Ok(_) => {}
51                // Ignore ExecuteReturnedResults error because they're pointless
52                // and annoying.
53                Err(rusqlite::Error::ExecuteReturnedResults) => {}
54                Err(e) => return Err(e),
55            }
56        }
57        Ok(())
58    }
59
60    /// Execute a single statement.
61    fn execute_one(&self, stmt: &str) -> SqlResult<()> {
62        self.execute_all(&[stmt])
63    }
64
65    /// Equivalent to `Connection::execute` but caches the statement so that subsequent
66    /// calls to `execute_cached` will have improved performance.
67    fn execute_cached<P: Params>(&self, sql: &str, params: P) -> SqlResult<usize> {
68        let mut stmt = self.conn().prepare_cached(sql)?;
69        stmt.execute(params)
70    }
71
72    /// Execute a query that returns a single result column, and return that result.
73    fn query_one<T: FromSql>(&self, sql: &str) -> SqlResult<T> {
74        let res: T = self.conn().query_row_and_then(sql, [], |row| row.get(0))?;
75        Ok(res)
76    }
77
78    /// Return true if a query returns any rows
79    fn exists<P: Params>(&self, sql: &str, params: P) -> SqlResult<bool> {
80        let conn = self.conn();
81        let mut stmt = conn.prepare(sql)?;
82        let exists = stmt.query(params)?.next()?.is_some();
83        Ok(exists)
84    }
85
86    /// Execute a query that returns 0 or 1 result columns, returning None
87    /// if there were no rows, or if the only result was NULL.
88    fn try_query_one<T: FromSql, P: Params>(
89        &self,
90        sql: &str,
91        params: P,
92        cache: bool,
93    ) -> SqlResult<Option<T>>
94    where
95        Self: Sized,
96    {
97        use rusqlite::OptionalExtension;
98        // The outer option is if we got rows, the inner option is
99        // if the first row was null.
100        let res: Option<Option<T>> = self
101            .conn()
102            .query_row_and_then_cachable(sql, params, |row| row.get(0), cache)
103            .optional()?;
104        // go from Option<Option<T>> to Option<T>
105        Ok(res.unwrap_or_default())
106    }
107
108    /// Equivalent to `rusqlite::Connection::query_row_and_then` but allows
109    /// passing a flag to indicate that it's cached.
110    fn query_row_and_then_cachable<T, E, P, F>(
111        &self,
112        sql: &str,
113        params: P,
114        mapper: F,
115        cache: bool,
116    ) -> Result<T, E>
117    where
118        Self: Sized,
119        P: Params,
120        E: From<rusqlite::Error>,
121        F: FnOnce(&Row<'_>) -> Result<T, E>,
122    {
123        Ok(self
124            .try_query_row(sql, params, mapper, cache)?
125            .ok_or(rusqlite::Error::QueryReturnedNoRows)?)
126    }
127
128    /// Helper for when you'd like to get a `Vec<T>` of all the rows returned by a
129    /// query that takes named arguments. See also
130    /// `query_rows_and_then_cached`.
131    fn query_rows_and_then<T, E, P, F>(&self, sql: &str, params: P, mapper: F) -> Result<Vec<T>, E>
132    where
133        Self: Sized,
134        P: Params,
135        E: From<rusqlite::Error>,
136        F: FnMut(&Row<'_>) -> Result<T, E>,
137    {
138        query_rows_and_then_cachable(self.conn(), sql, params, mapper, false)
139    }
140
141    /// Helper for when you'd like to get a `Vec<T>` of all the rows returned by a
142    /// query that takes named arguments.
143    fn query_rows_and_then_cached<T, E, P, F>(
144        &self,
145        sql: &str,
146        params: P,
147        mapper: F,
148    ) -> Result<Vec<T>, E>
149    where
150        Self: Sized,
151        P: Params,
152        E: From<rusqlite::Error>,
153        F: FnMut(&Row<'_>) -> Result<T, E>,
154    {
155        query_rows_and_then_cachable(self.conn(), sql, params, mapper, true)
156    }
157
158    /// Like `query_rows_and_then_cachable`, but works if you want a non-Vec as a result.
159    /// # Example:
160    /// ```rust,no_run
161    /// # use std::collections::HashSet;
162    /// # use sql_support::ConnExt;
163    /// # use rusqlite::Connection;
164    /// fn get_visit_tombstones(conn: &Connection, id: i64) -> rusqlite::Result<HashSet<i64>> {
165    ///     Ok(conn.query_rows_into(
166    ///         "SELECT visit_date FROM moz_historyvisit_tombstones
167    ///          WHERE place_id = :place_id",
168    ///         &[(":place_id", &id)],
169    ///         |row| row.get::<_, i64>(0))?)
170    /// }
171    /// ```
172    /// Note if the type isn't inferred, you'll have to do something gross like
173    /// `conn.query_rows_into::<HashSet<_>, _, _, _>(...)`.
174    fn query_rows_into<Coll, T, E, P, F>(&self, sql: &str, params: P, mapper: F) -> Result<Coll, E>
175    where
176        Self: Sized,
177        E: From<rusqlite::Error>,
178        F: FnMut(&Row<'_>) -> Result<T, E>,
179        Coll: FromIterator<T>,
180        P: Params,
181    {
182        query_rows_and_then_cachable(self.conn(), sql, params, mapper, false)
183    }
184
185    /// Same as `query_rows_into`, but caches the stmt if possible.
186    fn query_rows_into_cached<Coll, T, E, P, F>(
187        &self,
188        sql: &str,
189        params: P,
190        mapper: F,
191    ) -> Result<Coll, E>
192    where
193        Self: Sized,
194        P: Params,
195        E: From<rusqlite::Error>,
196        F: FnMut(&Row<'_>) -> Result<T, E>,
197        Coll: FromIterator<T>,
198    {
199        query_rows_and_then_cachable(self.conn(), sql, params, mapper, true)
200    }
201
202    // This should probably have a longer name...
203    /// Like `query_row_and_then_cacheable` but returns None instead of erroring
204    /// if no such row exists.
205    fn try_query_row<T, E, P, F>(
206        &self,
207        sql: &str,
208        params: P,
209        mapper: F,
210        cache: bool,
211    ) -> Result<Option<T>, E>
212    where
213        Self: Sized,
214        P: Params,
215        E: From<rusqlite::Error>,
216        F: FnOnce(&Row<'_>) -> Result<T, E>,
217    {
218        let conn = self.conn();
219        let mut stmt = MaybeCached::prepare(conn, sql, cache)?;
220        let mut rows = stmt.query(params)?;
221        rows.next()?.map(mapper).transpose()
222    }
223
224    /// Caveat: This won't actually get used most of the time, and calls will
225    /// usually invoke rusqlite's method with the same name. See comment on
226    /// `UncheckedTransaction` for details (generally you probably don't need to
227    /// care)
228    fn unchecked_transaction(&self) -> SqlResult<UncheckedTransaction<'_>> {
229        UncheckedTransaction::new(self.conn(), TransactionBehavior::Deferred)
230    }
231
232    /// Begin `unchecked_transaction` with `TransactionBehavior::Immediate`. Use
233    /// when the first operation will be a read operation, that further writes
234    /// depend on for correctness.
235    fn unchecked_transaction_imm(&self) -> SqlResult<UncheckedTransaction<'_>> {
236        UncheckedTransaction::new(self.conn(), TransactionBehavior::Immediate)
237    }
238
239    /// Get the DB size in bytes
240    fn get_db_size(&self) -> Result<u32, rusqlite::Error> {
241        let page_count: u32 = self.query_one("SELECT * from pragma_page_count()")?;
242        let page_size: u32 = self.query_one("SELECT * from pragma_page_size()")?;
243        let freelist_count: u32 = self.query_one("SELECT * from pragma_freelist_count()")?;
244
245        Ok((page_count - freelist_count) * page_size)
246    }
247}
248
249impl ConnExt for Connection {
250    #[inline]
251    fn conn(&self) -> &Connection {
252        self
253    }
254}
255
256impl ConnExt for Transaction<'_> {
257    #[inline]
258    fn conn(&self) -> &Connection {
259        self
260    }
261}
262
263impl ConnExt for Savepoint<'_> {
264    #[inline]
265    fn conn(&self) -> &Connection {
266        self
267    }
268}
269
270/// rusqlite, in an attempt to save us from ourselves, needs a mutable ref to a
271/// connection to start a transaction. That is a bit of a PITA in some cases, so
272/// we offer this as an alternative - but the responsibility of ensuring there
273/// are no concurrent transactions is on our head.
274///
275/// This is very similar to the rusqlite `Transaction` - it doesn't prevent
276/// against nested transactions but does allow you to use an immutable
277/// `Connection`.
278///
279/// FIXME: This currently won't actually be used most of the time, because
280/// `rusqlite` added [`Connection::unchecked_transaction`] (and
281/// `Transaction::new_unchecked`, which can be used to reimplement
282/// `unchecked_transaction_imm`), which will be preferred in a call to
283/// `c.unchecked_transaction()`, because inherent methods have precedence over
284/// methods on extension traits. The exception here is that this will still be
285/// used by code which takes `&impl ConnExt` (I believe it would also be used if
286/// you attempted to call `unchecked_transaction()` on a non-Connection that
287/// implements ConnExt, such as a `Safepoint`, `UncheckedTransaction`, or
288/// `Transaction` itself, but such code is clearly broken, so is not worth
289/// considering).
290///
291/// The difference is that `rusqlite`'s version returns a normal
292/// `rusqlite::Transaction`, rather than the `UncheckedTransaction` from this
293/// crate. Aside from type's name and location (and the fact that `rusqlite`'s
294/// detects slightly more misuse at compile time, and has more features), the
295/// main difference is: `rusqlite`'s does not track when a transaction began,
296/// which unfortunately seems to be used by the coop-transaction management in
297/// places in some fashion.
298///
299/// There are at least two options for how to fix this:
300/// 1. Decide we don't need this version, and delete it, and moving the
301///    transaction timing into the coop-transaction code directly (or something
302///    like this).
303/// 2. Decide this difference *is* important, and rename
304///    `ConnExt::unchecked_transaction` to something like
305///    `ConnExt::transaction_unchecked`.
306pub struct UncheckedTransaction<'conn> {
307    pub conn: &'conn Connection,
308    pub started_at: Instant,
309    pub finished: bool,
310    // we could add drop_behavior etc too, but we don't need it yet - we
311    // always rollback.
312}
313
314impl<'conn> UncheckedTransaction<'conn> {
315    /// Begin a new unchecked transaction. Cannot be nested, but this is not
316    /// enforced by Rust (hence 'unchecked') - however, it is enforced by
317    /// SQLite; use a rusqlite `savepoint` for nested transactions.
318    pub fn new(conn: &'conn Connection, behavior: TransactionBehavior) -> SqlResult<Self> {
319        let query = match behavior {
320            TransactionBehavior::Deferred => "BEGIN DEFERRED",
321            TransactionBehavior::Immediate => "BEGIN IMMEDIATE",
322            TransactionBehavior::Exclusive => "BEGIN EXCLUSIVE",
323            _ => unreachable!(),
324        };
325        conn.execute_batch(query)
326            .map(move |_| UncheckedTransaction {
327                conn,
328                started_at: Instant::now(),
329                finished: false,
330            })
331    }
332
333    /// Consumes and commits an unchecked transaction.
334    pub fn commit(mut self) -> SqlResult<()> {
335        if self.finished {
336            warn!("ignoring request to commit an already finished transaction");
337            return Ok(());
338        }
339        self.finished = true;
340        self.conn.execute_batch("COMMIT")?;
341        debug!("Transaction commited after {:?}", self.started_at.elapsed());
342        Ok(())
343    }
344
345    /// Consumes and rolls back an unchecked transaction.
346    pub fn rollback(mut self) -> SqlResult<()> {
347        if self.finished {
348            warn!("ignoring request to rollback an already finished transaction");
349            return Ok(());
350        }
351        self.rollback_()
352    }
353
354    fn rollback_(&mut self) -> SqlResult<()> {
355        self.finished = true;
356        self.conn.execute_batch("ROLLBACK")?;
357        Ok(())
358    }
359
360    fn finish_(&mut self) -> SqlResult<()> {
361        if self.finished || self.conn.is_autocommit() {
362            return Ok(());
363        }
364        self.rollback_()?;
365        Ok(())
366    }
367}
368
369impl Deref for UncheckedTransaction<'_> {
370    type Target = Connection;
371
372    #[inline]
373    fn deref(&self) -> &Connection {
374        self.conn
375    }
376}
377
378impl Drop for UncheckedTransaction<'_> {
379    fn drop(&mut self) {
380        if let Err(e) = self.finish_() {
381            warn!("Error dropping an unchecked transaction: {}", e);
382        }
383    }
384}
385
386impl ConnExt for UncheckedTransaction<'_> {
387    #[inline]
388    fn conn(&self) -> &Connection {
389        self
390    }
391}
392
393fn query_rows_and_then_cachable<Coll, T, E, P, F>(
394    conn: &Connection,
395    sql: &str,
396    params: P,
397    mapper: F,
398    cache: bool,
399) -> Result<Coll, E>
400where
401    E: From<rusqlite::Error>,
402    F: FnMut(&Row<'_>) -> Result<T, E>,
403    Coll: FromIterator<T>,
404    P: Params,
405{
406    let mut stmt = conn.prepare_maybe_cached(sql, cache)?;
407    let iter = stmt.query_and_then(params, mapper)?;
408    iter.collect::<Result<Coll, E>>()
409}