places/db/tx/coop_transaction.rs
1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5//! This implements "cooperative transactions" for places. It relies on our
6//! decision to have exactly 1 general purpose "writer" connection and exactly
7//! one "sync writer" - ie, exactly 2 write connections.
8//!
9//! We'll describe the implementation and strategy, but note that most callers
10//! should use `PlacesDb::begin_transaction()`, which will do the right thing
11//! for your db type.
12//!
13//! The idea is that anything that uses the sync connection should use
14//! `chunked_coop_trransaction`. Code using this should regularly call
15//! `maybe_commit()`, and every second, will commit the transaction and start
16//! a new one.
17//!
18//! This means that in theory the other writable connection can start
19//! transactions and should block for a max of 1 second - well under the 5
20//! seconds before that other writer will fail with a SQLITE_BUSY or similar error.
21//!
22//! However, in practice we see the writer thread being starved - even though
23//! it's waiting for a lock, the sync thread still manages to re-get the lock.
24//! In other words, the locks used by sqlite aren't "fair".
25//!
26//! So we mitigate this with a simple approach that works fine within our
27//! "exactly 2 writers" constraints:
28//! * Each database connection shares a mutex.
29//! * Before starting a transaction, each connection locks the mutex.
30//! * It then starts an "immediate" transaction - because sqlite now holds a
31//! lock on our behalf, we release the lock on the mutex.
32//!
33//! In other words, the lock is held only while obtaining the DB lock, then
34//! immediately released.
35//!
36//! The end result here is that if either connection is waiting for the
37//! database lock because the other already holds it, the waiting one is
38//! guaranteed to get the database lock next.
39//!
40//! One additional wrinkle here is that even if there was exactly one writer,
41//! there's still a possibility of SQLITE_BUSY if the database is being
42//! checkpointed. So we handle that case and perform exactly 1 retry.
43
44use crate::api::places_api::ConnectionType;
45use crate::db::PlacesDb;
46use crate::error::*;
47use parking_lot::Mutex;
48use rusqlite::{Connection, TransactionBehavior};
49use sql_support::{ConnExt, UncheckedTransaction};
50use std::ops::Deref;
51use std::time::{Duration, Instant};
52
53impl PlacesDb {
54 /// Begin a ChunkedCoopTransaction. Must be called from the
55 /// sync connection, see module doc for details.
56 pub(super) fn chunked_coop_trransaction(&self) -> Result<ChunkedCoopTransaction<'_>> {
57 // Note: if there's actually a reason for a write conn to take this, we
58 // can consider relaxing this. It's not required for correctness, just happens
59 // to be the right choice for everything we expose and plan on exposing.
60 assert_eq!(
61 self.conn_type(),
62 ConnectionType::Sync,
63 "chunked_coop_trransaction must only be called by the Sync connection"
64 );
65 // Note that we don't allow commit_after as a param because it
66 // is closely related to the timeouts configured on the database
67 // itself.
68 let commit_after = Duration::from_millis(1000);
69 ChunkedCoopTransaction::new(self.conn(), commit_after, &self.coop_tx_lock)
70 }
71
72 /// Begin a "coop" transaction. Must be called from the write connection, see
73 /// module doc for details.
74 pub(super) fn coop_transaction(&self) -> Result<UncheckedTransaction<'_>> {
75 // Only validate transaction types for ConnectionType::ReadWrite.
76 assert_eq!(
77 self.conn_type(),
78 ConnectionType::ReadWrite,
79 "coop_transaction must only be called on the ReadWrite connection"
80 );
81 let _lock = self.coop_tx_lock.lock();
82 get_tx_with_retry_on_locked(self.conn())
83 }
84}
85
86/// This transaction is suitable for when a transaction is used purely for
87/// performance reasons rather than for data-integrity reasons, or when it's
88/// used for integrity but held longer than strictly necessary for performance
89/// reasons (ie, when it could be multiple transactions and still guarantee
90/// integrity.) Examples of this might be for performance when updating a larger
91/// number of rows, but data integrity concerns could be addressed by using
92/// multiple, smaller transactions.
93///
94/// You should regularly call .maybe_commit() as part of your
95/// processing, and if the current transaction has been open for greater than
96/// some duration the transaction will be committed and another one
97/// started. You should always call .commit() at the end. Note that there is
98/// no .rollback() method as it will be very difficult to work out what was
99/// previously committed and therefore what was rolled back - if you need to
100/// explicitly roll-back, this probably isn't what you should be using. Note
101/// that SQLite might rollback for its own reasons though.
102///
103/// Note that this can still be used for transactions which ensure data
104/// integrity. For example, if you are processing a large group of items, and
105/// each individual item requires multiple updates, you will probably want to
106/// ensure you call .maybe_commit() after every item rather than after
107/// each individual database update.
108pub struct ChunkedCoopTransaction<'conn> {
109 tx: UncheckedTransaction<'conn>,
110 commit_after: Duration,
111 coop: &'conn Mutex<()>,
112}
113
114impl<'conn> ChunkedCoopTransaction<'conn> {
115 /// Begin a new transaction which may be split into multiple transactions
116 /// for performance reasons. Cannot be nested, but this is not
117 /// enforced - however, it is enforced by SQLite; use a rusqlite `savepoint`
118 /// for nested transactions.
119 pub fn new(
120 conn: &'conn Connection,
121 commit_after: Duration,
122 coop: &'conn Mutex<()>,
123 ) -> Result<Self> {
124 let _lock = coop.lock();
125 let tx = get_tx_with_retry_on_locked(conn)?;
126 Ok(Self {
127 tx,
128 commit_after,
129 coop,
130 })
131 }
132
133 /// Returns `true` if the current transaction has been open for longer than
134 /// the requested time, and should be committed; `false` otherwise. In most
135 /// cases, there's no need to use this method, since `maybe_commit()` does
136 /// so internally. It's exposed for consumers that need to run additional
137 /// pre-commit logic, like cleaning up temp tables.
138 ///
139 /// If this method returns `true`, it's guaranteed that `maybe_commit()`
140 /// will commit the transaction.
141 #[inline]
142 pub fn should_commit(&self) -> bool {
143 self.tx.started_at.elapsed() >= self.commit_after
144 }
145
146 /// Checks to see if we have held a transaction for longer than the
147 /// requested time, and if so, commits the current transaction and opens
148 /// another.
149 #[inline]
150 pub fn maybe_commit(&mut self) -> Result<()> {
151 if self.should_commit() {
152 debug!("ChunkedCoopTransaction committing after taking allocated time");
153 self.commit_and_start_new_tx()?;
154 }
155 Ok(())
156 }
157
158 fn commit_and_start_new_tx(&mut self) -> Result<()> {
159 // We can't call self.tx.commit() here as it wants to consume
160 // self.tx, and we can't set up the new self.tx first as then
161 // we'll be trying to start a new transaction while the current
162 // one is in progress. So explicitly set the finished flag on it.
163 self.tx.finished = true;
164 self.tx.execute_batch("COMMIT")?;
165 // acquire a lock on our cooperator - if our only other writer
166 // thread holds a write lock we'll block until it is released.
167 // Note however that sqlite might still return a locked error if the
168 // database is being checkpointed - so we still perform exactly 1 retry,
169 // which we do while we have the lock, because we don't want our other
170 // write connection to win this race either.
171 let _lock = self.coop.lock();
172 self.tx = get_tx_with_retry_on_locked(self.tx.conn)?;
173 Ok(())
174 }
175
176 /// Consumes and commits a ChunkedCoopTransaction transaction.
177 pub fn commit(self) -> Result<()> {
178 self.tx.commit()?;
179 Ok(())
180 }
181
182 /// Consumes and rolls a ChunkedCoopTransaction, but potentially only back
183 /// to the last `maybe_commit`.
184 pub fn rollback(self) -> Result<()> {
185 self.tx.rollback()?;
186 Ok(())
187 }
188}
189
190impl Deref for ChunkedCoopTransaction<'_> {
191 type Target = Connection;
192
193 #[inline]
194 fn deref(&self) -> &Connection {
195 self.tx.conn
196 }
197}
198
199impl ConnExt for ChunkedCoopTransaction<'_> {
200 #[inline]
201 fn conn(&self) -> &Connection {
202 self
203 }
204}
205
206// A helper that attempts to get an Immediate lock on the DB. If it fails with
207// a "busy" or "locked" error, it does exactly 1 retry.
208fn get_tx_with_retry_on_locked(conn: &Connection) -> Result<UncheckedTransaction<'_>> {
209 let behavior = TransactionBehavior::Immediate;
210 match UncheckedTransaction::new(conn, behavior) {
211 Ok(tx) => Ok(tx),
212 Err(rusqlite::Error::SqliteFailure(err, _))
213 if err.code == rusqlite::ErrorCode::DatabaseBusy
214 || err.code == rusqlite::ErrorCode::DatabaseLocked =>
215 {
216 // retry the lock - we assume that this lock request still
217 // blocks for the default period, so we don't need to sleep
218 // etc.
219 let started_at = Instant::now();
220 warn!("Attempting to get a read lock failed - doing one retry");
221 let tx = UncheckedTransaction::new(conn, behavior).inspect_err(|_err| {
222 warn!("Retrying the lock failed after {:?}", started_at.elapsed());
223 })?;
224 info!("Retrying the lock worked after {:?}", started_at.elapsed());
225 Ok(tx)
226 }
227 Err(e) => Err(e.into()),
228 }
229}