1use rusqlite::{
6 self,
7 types::{FromSql, ToSql},
8 Connection, Params, Result as SqlResult, Row, Savepoint, Transaction, TransactionBehavior,
9};
10use std::iter::FromIterator;
11use std::ops::Deref;
12use std::time::Instant;
13
14use crate::maybe_cached::MaybeCached;
15use crate::{debug, warn};
16
17pub trait ConnExt {
20 fn conn(&self) -> &Connection;
22
23 fn set_pragma<T>(&self, pragma_name: &str, pragma_value: T) -> SqlResult<&Self>
25 where
26 T: ToSql,
27 Self: Sized,
28 {
29 self.conn()
31 .pragma_update(None, pragma_name, &pragma_value)?;
32 Ok(self)
33 }
34
35 fn prepare_maybe_cached<'conn>(
37 &'conn self,
38 sql: &str,
39 cache: bool,
40 ) -> SqlResult<MaybeCached<'conn>> {
41 MaybeCached::prepare(self.conn(), sql, cache)
42 }
43
44 fn execute_all(&self, stmts: &[&str]) -> SqlResult<()> {
46 let conn = self.conn();
47 for sql in stmts {
48 let r = conn.execute(sql, []);
49 match r {
50 Ok(_) => {}
51 Err(rusqlite::Error::ExecuteReturnedResults) => {}
54 Err(e) => return Err(e),
55 }
56 }
57 Ok(())
58 }
59
60 fn execute_one(&self, stmt: &str) -> SqlResult<()> {
62 self.execute_all(&[stmt])
63 }
64
65 fn execute_cached<P: Params>(&self, sql: &str, params: P) -> SqlResult<usize> {
68 let mut stmt = self.conn().prepare_cached(sql)?;
69 stmt.execute(params)
70 }
71
72 fn query_one<T: FromSql>(&self, sql: &str) -> SqlResult<T> {
74 let res: T = self.conn().query_row_and_then(sql, [], |row| row.get(0))?;
75 Ok(res)
76 }
77
78 fn exists<P: Params>(&self, sql: &str, params: P) -> SqlResult<bool> {
80 let conn = self.conn();
81 let mut stmt = conn.prepare(sql)?;
82 let exists = stmt.query(params)?.next()?.is_some();
83 Ok(exists)
84 }
85
86 fn try_query_one<T: FromSql, P: Params>(
89 &self,
90 sql: &str,
91 params: P,
92 cache: bool,
93 ) -> SqlResult<Option<T>>
94 where
95 Self: Sized,
96 {
97 use rusqlite::OptionalExtension;
98 let res: Option<Option<T>> = self
101 .conn()
102 .query_row_and_then_cachable(sql, params, |row| row.get(0), cache)
103 .optional()?;
104 Ok(res.unwrap_or_default())
106 }
107
108 fn query_row_and_then_cachable<T, E, P, F>(
111 &self,
112 sql: &str,
113 params: P,
114 mapper: F,
115 cache: bool,
116 ) -> Result<T, E>
117 where
118 Self: Sized,
119 P: Params,
120 E: From<rusqlite::Error>,
121 F: FnOnce(&Row<'_>) -> Result<T, E>,
122 {
123 Ok(self
124 .try_query_row(sql, params, mapper, cache)?
125 .ok_or(rusqlite::Error::QueryReturnedNoRows)?)
126 }
127
128 fn query_rows_and_then<T, E, P, F>(&self, sql: &str, params: P, mapper: F) -> Result<Vec<T>, E>
132 where
133 Self: Sized,
134 P: Params,
135 E: From<rusqlite::Error>,
136 F: FnMut(&Row<'_>) -> Result<T, E>,
137 {
138 query_rows_and_then_cachable(self.conn(), sql, params, mapper, false)
139 }
140
141 fn query_rows_and_then_cached<T, E, P, F>(
144 &self,
145 sql: &str,
146 params: P,
147 mapper: F,
148 ) -> Result<Vec<T>, E>
149 where
150 Self: Sized,
151 P: Params,
152 E: From<rusqlite::Error>,
153 F: FnMut(&Row<'_>) -> Result<T, E>,
154 {
155 query_rows_and_then_cachable(self.conn(), sql, params, mapper, true)
156 }
157
158 fn query_rows_into<Coll, T, E, P, F>(&self, sql: &str, params: P, mapper: F) -> Result<Coll, E>
175 where
176 Self: Sized,
177 E: From<rusqlite::Error>,
178 F: FnMut(&Row<'_>) -> Result<T, E>,
179 Coll: FromIterator<T>,
180 P: Params,
181 {
182 query_rows_and_then_cachable(self.conn(), sql, params, mapper, false)
183 }
184
185 fn query_rows_into_cached<Coll, T, E, P, F>(
187 &self,
188 sql: &str,
189 params: P,
190 mapper: F,
191 ) -> Result<Coll, E>
192 where
193 Self: Sized,
194 P: Params,
195 E: From<rusqlite::Error>,
196 F: FnMut(&Row<'_>) -> Result<T, E>,
197 Coll: FromIterator<T>,
198 {
199 query_rows_and_then_cachable(self.conn(), sql, params, mapper, true)
200 }
201
202 fn try_query_row<T, E, P, F>(
206 &self,
207 sql: &str,
208 params: P,
209 mapper: F,
210 cache: bool,
211 ) -> Result<Option<T>, E>
212 where
213 Self: Sized,
214 P: Params,
215 E: From<rusqlite::Error>,
216 F: FnOnce(&Row<'_>) -> Result<T, E>,
217 {
218 let conn = self.conn();
219 let mut stmt = MaybeCached::prepare(conn, sql, cache)?;
220 let mut rows = stmt.query(params)?;
221 rows.next()?.map(mapper).transpose()
222 }
223
224 fn unchecked_transaction(&self) -> SqlResult<UncheckedTransaction<'_>> {
229 UncheckedTransaction::new(self.conn(), TransactionBehavior::Deferred)
230 }
231
232 fn unchecked_transaction_imm(&self) -> SqlResult<UncheckedTransaction<'_>> {
236 UncheckedTransaction::new(self.conn(), TransactionBehavior::Immediate)
237 }
238
239 fn get_db_size(&self) -> Result<u32, rusqlite::Error> {
241 let page_count: u32 = self.query_one("SELECT * from pragma_page_count()")?;
242 let page_size: u32 = self.query_one("SELECT * from pragma_page_size()")?;
243 let freelist_count: u32 = self.query_one("SELECT * from pragma_freelist_count()")?;
244
245 Ok((page_count - freelist_count) * page_size)
246 }
247}
248
249impl ConnExt for Connection {
250 #[inline]
251 fn conn(&self) -> &Connection {
252 self
253 }
254}
255
256impl ConnExt for Transaction<'_> {
257 #[inline]
258 fn conn(&self) -> &Connection {
259 self
260 }
261}
262
263impl ConnExt for Savepoint<'_> {
264 #[inline]
265 fn conn(&self) -> &Connection {
266 self
267 }
268}
269
270pub struct UncheckedTransaction<'conn> {
307 pub conn: &'conn Connection,
308 pub started_at: Instant,
309 pub finished: bool,
310 }
313
314impl<'conn> UncheckedTransaction<'conn> {
315 pub fn new(conn: &'conn Connection, behavior: TransactionBehavior) -> SqlResult<Self> {
319 let query = match behavior {
320 TransactionBehavior::Deferred => "BEGIN DEFERRED",
321 TransactionBehavior::Immediate => "BEGIN IMMEDIATE",
322 TransactionBehavior::Exclusive => "BEGIN EXCLUSIVE",
323 _ => unreachable!(),
324 };
325 conn.execute_batch(query)
326 .map(move |_| UncheckedTransaction {
327 conn,
328 started_at: Instant::now(),
329 finished: false,
330 })
331 }
332
333 pub fn commit(mut self) -> SqlResult<()> {
335 if self.finished {
336 warn!("ignoring request to commit an already finished transaction");
337 return Ok(());
338 }
339 self.finished = true;
340 self.conn.execute_batch("COMMIT")?;
341 debug!("Transaction commited after {:?}", self.started_at.elapsed());
342 Ok(())
343 }
344
345 pub fn rollback(mut self) -> SqlResult<()> {
347 if self.finished {
348 warn!("ignoring request to rollback an already finished transaction");
349 return Ok(());
350 }
351 self.rollback_()
352 }
353
354 fn rollback_(&mut self) -> SqlResult<()> {
355 self.finished = true;
356 self.conn.execute_batch("ROLLBACK")?;
357 Ok(())
358 }
359
360 fn finish_(&mut self) -> SqlResult<()> {
361 if self.finished || self.conn.is_autocommit() {
362 return Ok(());
363 }
364 self.rollback_()?;
365 Ok(())
366 }
367}
368
369impl Deref for UncheckedTransaction<'_> {
370 type Target = Connection;
371
372 #[inline]
373 fn deref(&self) -> &Connection {
374 self.conn
375 }
376}
377
378impl Drop for UncheckedTransaction<'_> {
379 fn drop(&mut self) {
380 if let Err(e) = self.finish_() {
381 warn!("Error dropping an unchecked transaction: {}", e);
382 }
383 }
384}
385
386impl ConnExt for UncheckedTransaction<'_> {
387 #[inline]
388 fn conn(&self) -> &Connection {
389 self
390 }
391}
392
393fn query_rows_and_then_cachable<Coll, T, E, P, F>(
394 conn: &Connection,
395 sql: &str,
396 params: P,
397 mapper: F,
398 cache: bool,
399) -> Result<Coll, E>
400where
401 E: From<rusqlite::Error>,
402 F: FnMut(&Row<'_>) -> Result<T, E>,
403 Coll: FromIterator<T>,
404 P: Params,
405{
406 let mut stmt = conn.prepare_maybe_cached(sql, cache)?;
407 let iter = stmt.query_and_then(params, mapper)?;
408 iter.collect::<Result<Coll, E>>()
409}