places/db/tx/coop_transaction.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
//! This implements "cooperative transactions" for places. It relies on our
//! decision to have exactly 1 general purpose "writer" connection and exactly
//! one "sync writer" - ie, exactly 2 write connections.
//!
//! We'll describe the implementation and strategy, but note that most callers
//! should use `PlacesDb::begin_transaction()`, which will do the right thing
//! for your db type.
//!
//! The idea is that anything that uses the sync connection should use
//! `chunked_coop_trransaction`. Code using this should regularly call
//! `maybe_commit()`, and every second, will commit the transaction and start
//! a new one.
//!
//! This means that in theory the other writable connection can start
//! transactions and should block for a max of 1 second - well under the 5
//! seconds before that other writer will fail with a SQLITE_BUSY or similar error.
//!
//! However, in practice we see the writer thread being starved - even though
//! it's waiting for a lock, the sync thread still manages to re-get the lock.
//! In other words, the locks used by sqlite aren't "fair".
//!
//! So we mitigate this with a simple approach that works fine within our
//! "exactly 2 writers" constraints:
//! * Each database connection shares a mutex.
//! * Before starting a transaction, each connection locks the mutex.
//! * It then starts an "immediate" transaction - because sqlite now holds a
//! lock on our behalf, we release the lock on the mutex.
//!
//! In other words, the lock is held only while obtaining the DB lock, then
//! immediately released.
//!
//! The end result here is that if either connection is waiting for the
//! database lock because the other already holds it, the waiting one is
//! guaranteed to get the database lock next.
//!
//! One additional wrinkle here is that even if there was exactly one writer,
//! there's still a possibility of SQLITE_BUSY if the database is being
//! checkpointed. So we handle that case and perform exactly 1 retry.
use crate::api::places_api::ConnectionType;
use crate::db::PlacesDb;
use crate::error::*;
use parking_lot::Mutex;
use rusqlite::{Connection, TransactionBehavior};
use sql_support::{ConnExt, UncheckedTransaction};
use std::ops::Deref;
use std::time::{Duration, Instant};
impl PlacesDb {
/// Begin a ChunkedCoopTransaction. Must be called from the
/// sync connection, see module doc for details.
pub(super) fn chunked_coop_trransaction(&self) -> Result<ChunkedCoopTransaction<'_>> {
// Note: if there's actually a reason for a write conn to take this, we
// can consider relaxing this. It's not required for correctness, just happens
// to be the right choice for everything we expose and plan on exposing.
assert_eq!(
self.conn_type(),
ConnectionType::Sync,
"chunked_coop_trransaction must only be called by the Sync connection"
);
// Note that we don't allow commit_after as a param because it
// is closely related to the timeouts configured on the database
// itself.
let commit_after = Duration::from_millis(1000);
ChunkedCoopTransaction::new(self.conn(), commit_after, &self.coop_tx_lock)
}
/// Begin a "coop" transaction. Must be called from the write connection, see
/// module doc for details.
pub(super) fn coop_transaction(&self) -> Result<UncheckedTransaction<'_>> {
// Only validate transaction types for ConnectionType::ReadWrite.
assert_eq!(
self.conn_type(),
ConnectionType::ReadWrite,
"coop_transaction must only be called on the ReadWrite connection"
);
let _lock = self.coop_tx_lock.lock();
get_tx_with_retry_on_locked(self.conn())
}
}
/// This transaction is suitable for when a transaction is used purely for
/// performance reasons rather than for data-integrity reasons, or when it's
/// used for integrity but held longer than strictly necessary for performance
/// reasons (ie, when it could be multiple transactions and still guarantee
/// integrity.) Examples of this might be for performance when updating a larger
/// number of rows, but data integrity concerns could be addressed by using
/// multiple, smaller transactions.
///
/// You should regularly call .maybe_commit() as part of your
/// processing, and if the current transaction has been open for greater than
/// some duration the transaction will be committed and another one
/// started. You should always call .commit() at the end. Note that there is
/// no .rollback() method as it will be very difficult to work out what was
/// previously committed and therefore what was rolled back - if you need to
/// explicitly roll-back, this probably isn't what you should be using. Note
/// that SQLite might rollback for its own reasons though.
///
/// Note that this can still be used for transactions which ensure data
/// integrity. For example, if you are processing a large group of items, and
/// each individual item requires multiple updates, you will probably want to
/// ensure you call .maybe_commit() after every item rather than after
/// each individual database update.
pub struct ChunkedCoopTransaction<'conn> {
tx: UncheckedTransaction<'conn>,
commit_after: Duration,
coop: &'conn Mutex<()>,
}
impl<'conn> ChunkedCoopTransaction<'conn> {
/// Begin a new transaction which may be split into multiple transactions
/// for performance reasons. Cannot be nested, but this is not
/// enforced - however, it is enforced by SQLite; use a rusqlite `savepoint`
/// for nested transactions.
pub fn new(
conn: &'conn Connection,
commit_after: Duration,
coop: &'conn Mutex<()>,
) -> Result<Self> {
let _lock = coop.lock();
let tx = get_tx_with_retry_on_locked(conn)?;
Ok(Self {
tx,
commit_after,
coop,
})
}
/// Returns `true` if the current transaction has been open for longer than
/// the requested time, and should be committed; `false` otherwise. In most
/// cases, there's no need to use this method, since `maybe_commit()` does
/// so internally. It's exposed for consumers that need to run additional
/// pre-commit logic, like cleaning up temp tables.
///
/// If this method returns `true`, it's guaranteed that `maybe_commit()`
/// will commit the transaction.
#[inline]
pub fn should_commit(&self) -> bool {
self.tx.started_at.elapsed() >= self.commit_after
}
/// Checks to see if we have held a transaction for longer than the
/// requested time, and if so, commits the current transaction and opens
/// another.
#[inline]
pub fn maybe_commit(&mut self) -> Result<()> {
if self.should_commit() {
log::debug!("ChunkedCoopTransaction committing after taking allocated time");
self.commit_and_start_new_tx()?;
}
Ok(())
}
fn commit_and_start_new_tx(&mut self) -> Result<()> {
// We can't call self.tx.commit() here as it wants to consume
// self.tx, and we can't set up the new self.tx first as then
// we'll be trying to start a new transaction while the current
// one is in progress. So explicitly set the finished flag on it.
self.tx.finished = true;
self.tx.execute_batch("COMMIT")?;
// acquire a lock on our cooperator - if our only other writer
// thread holds a write lock we'll block until it is released.
// Note however that sqlite might still return a locked error if the
// database is being checkpointed - so we still perform exactly 1 retry,
// which we do while we have the lock, because we don't want our other
// write connection to win this race either.
let _lock = self.coop.lock();
self.tx = get_tx_with_retry_on_locked(self.tx.conn)?;
Ok(())
}
/// Consumes and commits a ChunkedCoopTransaction transaction.
pub fn commit(self) -> Result<()> {
self.tx.commit()?;
Ok(())
}
/// Consumes and rolls a ChunkedCoopTransaction, but potentially only back
/// to the last `maybe_commit`.
pub fn rollback(self) -> Result<()> {
self.tx.rollback()?;
Ok(())
}
}
impl Deref for ChunkedCoopTransaction<'_> {
type Target = Connection;
#[inline]
fn deref(&self) -> &Connection {
self.tx.conn
}
}
impl ConnExt for ChunkedCoopTransaction<'_> {
#[inline]
fn conn(&self) -> &Connection {
self
}
}
// A helper that attempts to get an Immediate lock on the DB. If it fails with
// a "busy" or "locked" error, it does exactly 1 retry.
fn get_tx_with_retry_on_locked(conn: &Connection) -> Result<UncheckedTransaction<'_>> {
let behavior = TransactionBehavior::Immediate;
match UncheckedTransaction::new(conn, behavior) {
Ok(tx) => Ok(tx),
Err(rusqlite::Error::SqliteFailure(err, _))
if err.code == rusqlite::ErrorCode::DatabaseBusy
|| err.code == rusqlite::ErrorCode::DatabaseLocked =>
{
// retry the lock - we assume that this lock request still
// blocks for the default period, so we don't need to sleep
// etc.
let started_at = Instant::now();
log::warn!("Attempting to get a read lock failed - doing one retry");
let tx = UncheckedTransaction::new(conn, behavior).inspect_err(|_err| {
log::warn!("Retrying the lock failed after {:?}", started_at.elapsed());
})?;
log::info!("Retrying the lock worked after {:?}", started_at.elapsed());
Ok(tx)
}
Err(e) => Err(e.into()),
}
}