places/db/tx/
coop_transaction.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5//! This implements "cooperative transactions" for places. It relies on our
6//! decision to have exactly 1 general purpose "writer" connection and exactly
7//! one "sync writer" - ie, exactly 2 write connections.
8//!
9//! We'll describe the implementation and strategy, but note that most callers
10//! should use `PlacesDb::begin_transaction()`, which will do the right thing
11//! for your db type.
12//!
13//! The idea is that anything that uses the sync connection should use
14//! `chunked_coop_trransaction`. Code using this should regularly call
15//! `maybe_commit()`, and every second, will commit the transaction and start
16//! a new one.
17//!
18//! This means that in theory the other writable connection can start
19//! transactions and should block for a max of 1 second - well under the 5
20//! seconds before that other writer will fail with a SQLITE_BUSY or similar error.
21//!
22//! However, in practice we see the writer thread being starved - even though
23//! it's waiting for a lock, the sync thread still manages to re-get the lock.
24//! In other words, the locks used by sqlite aren't "fair".
25//!
26//! So we mitigate this with a simple approach that works fine within our
27//! "exactly 2 writers" constraints:
28//! * Each database connection shares a mutex.
29//! * Before starting a transaction, each connection locks the mutex.
30//! * It then starts an "immediate" transaction - because sqlite now holds a
31//!   lock on our behalf, we release the lock on the mutex.
32//!
33//! In other words, the lock is held only while obtaining the DB lock, then
34//! immediately released.
35//!
36//! The end result here is that if either connection is waiting for the
37//! database lock because the other already holds it, the waiting one is
38//! guaranteed to get the database lock next.
39//!
40//! One additional wrinkle here is that even if there was exactly one writer,
41//! there's still a possibility of SQLITE_BUSY if the database is being
42//! checkpointed. So we handle that case and perform exactly 1 retry.
43
44use crate::api::places_api::ConnectionType;
45use crate::db::PlacesDb;
46use crate::error::*;
47use parking_lot::Mutex;
48use rusqlite::{Connection, TransactionBehavior};
49use sql_support::{ConnExt, UncheckedTransaction};
50use std::ops::Deref;
51use std::time::{Duration, Instant};
52
53impl PlacesDb {
54    /// Begin a ChunkedCoopTransaction. Must be called from the
55    /// sync connection, see module doc for details.
56    pub(super) fn chunked_coop_trransaction(&self) -> Result<ChunkedCoopTransaction<'_>> {
57        // Note: if there's actually a reason for a write conn to take this, we
58        // can consider relaxing this. It's not required for correctness, just happens
59        // to be the right choice for everything we expose and plan on exposing.
60        assert_eq!(
61            self.conn_type(),
62            ConnectionType::Sync,
63            "chunked_coop_trransaction must only be called by the Sync connection"
64        );
65        // Note that we don't allow commit_after as a param because it
66        // is closely related to the timeouts configured on the database
67        // itself.
68        let commit_after = Duration::from_millis(1000);
69        ChunkedCoopTransaction::new(self.conn(), commit_after, &self.coop_tx_lock)
70    }
71
72    /// Begin a "coop" transaction. Must be called from the write connection, see
73    /// module doc for details.
74    pub(super) fn coop_transaction(&self) -> Result<UncheckedTransaction<'_>> {
75        // Only validate transaction types for ConnectionType::ReadWrite.
76        assert_eq!(
77            self.conn_type(),
78            ConnectionType::ReadWrite,
79            "coop_transaction must only be called on the ReadWrite connection"
80        );
81        let _lock = self.coop_tx_lock.lock();
82        get_tx_with_retry_on_locked(self.conn())
83    }
84}
85
86/// This transaction is suitable for when a transaction is used purely for
87/// performance reasons rather than for data-integrity reasons, or when it's
88/// used for integrity but held longer than strictly necessary for performance
89/// reasons (ie, when it could be multiple transactions and still guarantee
90/// integrity.) Examples of this might be for performance when updating a larger
91/// number of rows, but data integrity concerns could be addressed by using
92/// multiple, smaller transactions.
93///
94/// You should regularly call .maybe_commit() as part of your
95/// processing, and if the current transaction has been open for greater than
96/// some duration the transaction will be committed and another one
97/// started. You should always call .commit() at the end. Note that there is
98/// no .rollback() method as it will be very difficult to work out what was
99/// previously committed and therefore what was rolled back - if you need to
100/// explicitly roll-back, this probably isn't what you should be using. Note
101/// that SQLite might rollback for its own reasons though.
102///
103/// Note that this can still be used for transactions which ensure data
104/// integrity. For example, if you are processing a large group of items, and
105/// each individual item requires multiple updates, you will probably want to
106/// ensure you call .maybe_commit() after every item rather than after
107/// each individual database update.
108pub struct ChunkedCoopTransaction<'conn> {
109    tx: UncheckedTransaction<'conn>,
110    commit_after: Duration,
111    coop: &'conn Mutex<()>,
112}
113
114impl<'conn> ChunkedCoopTransaction<'conn> {
115    /// Begin a new transaction which may be split into multiple transactions
116    /// for performance reasons. Cannot be nested, but this is not
117    /// enforced - however, it is enforced by SQLite; use a rusqlite `savepoint`
118    /// for nested transactions.
119    pub fn new(
120        conn: &'conn Connection,
121        commit_after: Duration,
122        coop: &'conn Mutex<()>,
123    ) -> Result<Self> {
124        let _lock = coop.lock();
125        let tx = get_tx_with_retry_on_locked(conn)?;
126        Ok(Self {
127            tx,
128            commit_after,
129            coop,
130        })
131    }
132
133    /// Returns `true` if the current transaction has been open for longer than
134    /// the requested time, and should be committed; `false` otherwise. In most
135    /// cases, there's no need to use this method, since `maybe_commit()` does
136    /// so internally. It's exposed for consumers that need to run additional
137    /// pre-commit logic, like cleaning up temp tables.
138    ///
139    /// If this method returns `true`, it's guaranteed that `maybe_commit()`
140    /// will commit the transaction.
141    #[inline]
142    pub fn should_commit(&self) -> bool {
143        self.tx.started_at.elapsed() >= self.commit_after
144    }
145
146    /// Checks to see if we have held a transaction for longer than the
147    /// requested time, and if so, commits the current transaction and opens
148    /// another.
149    #[inline]
150    pub fn maybe_commit(&mut self) -> Result<()> {
151        if self.should_commit() {
152            debug!("ChunkedCoopTransaction committing after taking allocated time");
153            self.commit_and_start_new_tx()?;
154        }
155        Ok(())
156    }
157
158    fn commit_and_start_new_tx(&mut self) -> Result<()> {
159        // We can't call self.tx.commit() here as it wants to consume
160        // self.tx, and we can't set up the new self.tx first as then
161        // we'll be trying to start a new transaction while the current
162        // one is in progress. So explicitly set the finished flag on it.
163        self.tx.finished = true;
164        self.tx.execute_batch("COMMIT")?;
165        // acquire a lock on our cooperator - if our only other writer
166        // thread holds a write lock we'll block until it is released.
167        // Note however that sqlite might still return a locked error if the
168        // database is being checkpointed - so we still perform exactly 1 retry,
169        // which we do while we have the lock, because we don't want our other
170        // write connection to win this race either.
171        let _lock = self.coop.lock();
172        self.tx = get_tx_with_retry_on_locked(self.tx.conn)?;
173        Ok(())
174    }
175
176    /// Consumes and commits a ChunkedCoopTransaction transaction.
177    pub fn commit(self) -> Result<()> {
178        self.tx.commit()?;
179        Ok(())
180    }
181
182    /// Consumes and rolls a ChunkedCoopTransaction, but potentially only back
183    /// to the last `maybe_commit`.
184    pub fn rollback(self) -> Result<()> {
185        self.tx.rollback()?;
186        Ok(())
187    }
188}
189
190impl Deref for ChunkedCoopTransaction<'_> {
191    type Target = Connection;
192
193    #[inline]
194    fn deref(&self) -> &Connection {
195        self.tx.conn
196    }
197}
198
199impl ConnExt for ChunkedCoopTransaction<'_> {
200    #[inline]
201    fn conn(&self) -> &Connection {
202        self
203    }
204}
205
206// A helper that attempts to get an Immediate lock on the DB. If it fails with
207// a "busy" or "locked" error, it does exactly 1 retry.
208fn get_tx_with_retry_on_locked(conn: &Connection) -> Result<UncheckedTransaction<'_>> {
209    let behavior = TransactionBehavior::Immediate;
210    match UncheckedTransaction::new(conn, behavior) {
211        Ok(tx) => Ok(tx),
212        Err(rusqlite::Error::SqliteFailure(err, _))
213            if err.code == rusqlite::ErrorCode::DatabaseBusy
214                || err.code == rusqlite::ErrorCode::DatabaseLocked =>
215        {
216            // retry the lock - we assume that this lock request still
217            // blocks for the default period, so we don't need to sleep
218            // etc.
219            let started_at = Instant::now();
220            warn!("Attempting to get a read lock failed - doing one retry");
221            let tx = UncheckedTransaction::new(conn, behavior).inspect_err(|_err| {
222                warn!("Retrying the lock failed after {:?}", started_at.elapsed());
223            })?;
224            info!("Retrying the lock worked after {:?}", started_at.elapsed());
225            Ok(tx)
226        }
227        Err(e) => Err(e.into()),
228    }
229}