places/db/tx/
coop_transaction.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

//! This implements "cooperative transactions" for places. It relies on our
//! decision to have exactly 1 general purpose "writer" connection and exactly
//! one "sync writer" - ie, exactly 2 write connections.
//!
//! We'll describe the implementation and strategy, but note that most callers
//! should use `PlacesDb::begin_transaction()`, which will do the right thing
//! for your db type.
//!
//! The idea is that anything that uses the sync connection should use
//! `chunked_coop_trransaction`. Code using this should regularly call
//! `maybe_commit()`, and every second, will commit the transaction and start
//! a new one.
//!
//! This means that in theory the other writable connection can start
//! transactions and should block for a max of 1 second - well under the 5
//! seconds before that other writer will fail with a SQLITE_BUSY or similar error.
//!
//! However, in practice we see the writer thread being starved - even though
//! it's waiting for a lock, the sync thread still manages to re-get the lock.
//! In other words, the locks used by sqlite aren't "fair".
//!
//! So we mitigate this with a simple approach that works fine within our
//! "exactly 2 writers" constraints:
//! * Each database connection shares a mutex.
//! * Before starting a transaction, each connection locks the mutex.
//! * It then starts an "immediate" transaction - because sqlite now holds a
//!   lock on our behalf, we release the lock on the mutex.
//!
//! In other words, the lock is held only while obtaining the DB lock, then
//! immediately released.
//!
//! The end result here is that if either connection is waiting for the
//! database lock because the other already holds it, the waiting one is
//! guaranteed to get the database lock next.
//!
//! One additional wrinkle here is that even if there was exactly one writer,
//! there's still a possibility of SQLITE_BUSY if the database is being
//! checkpointed. So we handle that case and perform exactly 1 retry.

use crate::api::places_api::ConnectionType;
use crate::db::PlacesDb;
use crate::error::*;
use parking_lot::Mutex;
use rusqlite::{Connection, TransactionBehavior};
use sql_support::{ConnExt, UncheckedTransaction};
use std::ops::Deref;
use std::time::{Duration, Instant};

impl PlacesDb {
    /// Begin a ChunkedCoopTransaction. Must be called from the
    /// sync connection, see module doc for details.
    pub(super) fn chunked_coop_trransaction(&self) -> Result<ChunkedCoopTransaction<'_>> {
        // Note: if there's actually a reason for a write conn to take this, we
        // can consider relaxing this. It's not required for correctness, just happens
        // to be the right choice for everything we expose and plan on exposing.
        assert_eq!(
            self.conn_type(),
            ConnectionType::Sync,
            "chunked_coop_trransaction must only be called by the Sync connection"
        );
        // Note that we don't allow commit_after as a param because it
        // is closely related to the timeouts configured on the database
        // itself.
        let commit_after = Duration::from_millis(1000);
        ChunkedCoopTransaction::new(self.conn(), commit_after, &self.coop_tx_lock)
    }

    /// Begin a "coop" transaction. Must be called from the write connection, see
    /// module doc for details.
    pub(super) fn coop_transaction(&self) -> Result<UncheckedTransaction<'_>> {
        // Only validate transaction types for ConnectionType::ReadWrite.
        assert_eq!(
            self.conn_type(),
            ConnectionType::ReadWrite,
            "coop_transaction must only be called on the ReadWrite connection"
        );
        let _lock = self.coop_tx_lock.lock();
        get_tx_with_retry_on_locked(self.conn())
    }
}

/// This transaction is suitable for when a transaction is used purely for
/// performance reasons rather than for data-integrity reasons, or when it's
/// used for integrity but held longer than strictly necessary for performance
/// reasons (ie, when it could be multiple transactions and still guarantee
/// integrity.) Examples of this might be for performance when updating a larger
/// number of rows, but data integrity concerns could be addressed by using
/// multiple, smaller transactions.
///
/// You should regularly call .maybe_commit() as part of your
/// processing, and if the current transaction has been open for greater than
/// some duration the transaction will be committed and another one
/// started. You should always call .commit() at the end. Note that there is
/// no .rollback() method as it will be very difficult to work out what was
/// previously committed and therefore what was rolled back - if you need to
/// explicitly roll-back, this probably isn't what you should be using. Note
/// that SQLite might rollback for its own reasons though.
///
/// Note that this can still be used for transactions which ensure data
/// integrity. For example, if you are processing a large group of items, and
/// each individual item requires multiple updates, you will probably want to
/// ensure you call .maybe_commit() after every item rather than after
/// each individual database update.
pub struct ChunkedCoopTransaction<'conn> {
    tx: UncheckedTransaction<'conn>,
    commit_after: Duration,
    coop: &'conn Mutex<()>,
}

impl<'conn> ChunkedCoopTransaction<'conn> {
    /// Begin a new transaction which may be split into multiple transactions
    /// for performance reasons. Cannot be nested, but this is not
    /// enforced - however, it is enforced by SQLite; use a rusqlite `savepoint`
    /// for nested transactions.
    pub fn new(
        conn: &'conn Connection,
        commit_after: Duration,
        coop: &'conn Mutex<()>,
    ) -> Result<Self> {
        let _lock = coop.lock();
        let tx = get_tx_with_retry_on_locked(conn)?;
        Ok(Self {
            tx,
            commit_after,
            coop,
        })
    }

    /// Returns `true` if the current transaction has been open for longer than
    /// the requested time, and should be committed; `false` otherwise. In most
    /// cases, there's no need to use this method, since `maybe_commit()` does
    /// so internally. It's exposed for consumers that need to run additional
    /// pre-commit logic, like cleaning up temp tables.
    ///
    /// If this method returns `true`, it's guaranteed that `maybe_commit()`
    /// will commit the transaction.
    #[inline]
    pub fn should_commit(&self) -> bool {
        self.tx.started_at.elapsed() >= self.commit_after
    }

    /// Checks to see if we have held a transaction for longer than the
    /// requested time, and if so, commits the current transaction and opens
    /// another.
    #[inline]
    pub fn maybe_commit(&mut self) -> Result<()> {
        if self.should_commit() {
            log::debug!("ChunkedCoopTransaction committing after taking allocated time");
            self.commit_and_start_new_tx()?;
        }
        Ok(())
    }

    fn commit_and_start_new_tx(&mut self) -> Result<()> {
        // We can't call self.tx.commit() here as it wants to consume
        // self.tx, and we can't set up the new self.tx first as then
        // we'll be trying to start a new transaction while the current
        // one is in progress. So explicitly set the finished flag on it.
        self.tx.finished = true;
        self.tx.execute_batch("COMMIT")?;
        // acquire a lock on our cooperator - if our only other writer
        // thread holds a write lock we'll block until it is released.
        // Note however that sqlite might still return a locked error if the
        // database is being checkpointed - so we still perform exactly 1 retry,
        // which we do while we have the lock, because we don't want our other
        // write connection to win this race either.
        let _lock = self.coop.lock();
        self.tx = get_tx_with_retry_on_locked(self.tx.conn)?;
        Ok(())
    }

    /// Consumes and commits a ChunkedCoopTransaction transaction.
    pub fn commit(self) -> Result<()> {
        self.tx.commit()?;
        Ok(())
    }

    /// Consumes and rolls a ChunkedCoopTransaction, but potentially only back
    /// to the last `maybe_commit`.
    pub fn rollback(self) -> Result<()> {
        self.tx.rollback()?;
        Ok(())
    }
}

impl Deref for ChunkedCoopTransaction<'_> {
    type Target = Connection;

    #[inline]
    fn deref(&self) -> &Connection {
        self.tx.conn
    }
}

impl ConnExt for ChunkedCoopTransaction<'_> {
    #[inline]
    fn conn(&self) -> &Connection {
        self
    }
}

// A helper that attempts to get an Immediate lock on the DB. If it fails with
// a "busy" or "locked" error, it does exactly 1 retry.
fn get_tx_with_retry_on_locked(conn: &Connection) -> Result<UncheckedTransaction<'_>> {
    let behavior = TransactionBehavior::Immediate;
    match UncheckedTransaction::new(conn, behavior) {
        Ok(tx) => Ok(tx),
        Err(rusqlite::Error::SqliteFailure(err, _))
            if err.code == rusqlite::ErrorCode::DatabaseBusy
                || err.code == rusqlite::ErrorCode::DatabaseLocked =>
        {
            // retry the lock - we assume that this lock request still
            // blocks for the default period, so we don't need to sleep
            // etc.
            let started_at = Instant::now();
            log::warn!("Attempting to get a read lock failed - doing one retry");
            let tx = UncheckedTransaction::new(conn, behavior).inspect_err(|_err| {
                log::warn!("Retrying the lock failed after {:?}", started_at.elapsed());
            })?;
            log::info!("Retrying the lock worked after {:?}", started_at.elapsed());
            Ok(tx)
        }
        Err(e) => Err(e.into()),
    }
}