sql_support/
lazy.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

use crate::open_database::{open_database_with_flags, ConnectionInitializer, Error};
use interrupt_support::{register_interrupt, SqlInterruptHandle, SqlInterruptScope};
use parking_lot::{MappedMutexGuard, Mutex, MutexGuard};
use rusqlite::{Connection, OpenFlags};
use std::{
    path::{Path, PathBuf},
    sync::{Arc, Weak},
};

/// Lazily-loaded database with interruption support
///
/// In addition to the [Self::interrupt] method, LazyDb also calls
/// [interrupt_support::register_interrupt] on any opened database.  This means that if
/// [interrupt_support::shutdown] is called it will interrupt this database if it's open and
/// in-use.
pub struct LazyDb<CI> {
    path: PathBuf,
    open_flags: OpenFlags,
    connection_initializer: CI,
    // Note: if you're going to lock both mutexes at once, make sure to lock the connection mutex
    // first.  Otherwise, you risk creating a deadlock where two threads each hold one of the locks
    // and is waiting for the other.
    connection: Mutex<Option<Connection>>,
    // It's important to use a separate mutex for the interrupt handle, since the whole point is to
    // interrupt while another thread holds the connection mutex.  Since the only mutation is
    // setting/unsetting the Arc, maybe this could be sped up by using something like
    // `arc_swap::ArcSwap`, but that seems like overkill for our purposes. This mutex should rarely
    // be contested and interrupt operations execute quickly.
    interrupt_handle: Mutex<Option<Arc<SqlInterruptHandle>>>,
}

impl<CI: ConnectionInitializer> LazyDb<CI> {
    /// Create a new LazyDb
    ///
    /// This does not open the connection and is non-blocking
    pub fn new(path: &Path, open_flags: OpenFlags, connection_initializer: CI) -> Self {
        Self {
            path: path.to_owned(),
            open_flags,
            connection_initializer,
            connection: Mutex::new(None),
            interrupt_handle: Mutex::new(None),
        }
    }

    /// Lock the database mutex and get a connection and interrupt scope.
    ///
    /// If the connection is closed, it will be opened.
    ///
    /// Calling `lock` again, or calling `close`, from the same thread while the mutex guard is
    /// still alive will cause a deadlock.
    pub fn lock(&self) -> Result<(MappedMutexGuard<'_, Connection>, SqlInterruptScope), Error> {
        // Call get_conn first, then get_scope to ensure we acquire the locks in the correct order
        let conn = self.get_conn()?;
        let scope = self.get_scope(&conn)?;
        Ok((conn, scope))
    }

    fn get_conn(&self) -> Result<MappedMutexGuard<'_, Connection>, Error> {
        let mut guard = self.connection.lock();
        // Open the database if it wasn't opened before.  Do this outside of the MutexGuard::map call to simplify the error handling
        if guard.is_none() {
            *guard = Some(open_database_with_flags(
                &self.path,
                self.open_flags,
                &self.connection_initializer,
            )?);
        };
        // Use MutexGuard::map to get a Connection rather than Option<Connection>.  The unwrap()
        // call can't fail because of the previous code.
        Ok(MutexGuard::map(guard, |conn_option| {
            conn_option.as_mut().unwrap()
        }))
    }

    fn get_scope(&self, conn: &Connection) -> Result<SqlInterruptScope, Error> {
        let mut handle_guard = self.interrupt_handle.lock();
        let result = match handle_guard.as_ref() {
            Some(handle) => handle.begin_interrupt_scope(),
            None => {
                let handle = Arc::new(SqlInterruptHandle::new(conn));
                register_interrupt(
                    Arc::downgrade(&handle) as Weak<dyn AsRef<SqlInterruptHandle> + Send + Sync>
                );
                handle_guard.insert(handle).begin_interrupt_scope()
            }
        };
        // If we see an Interrupted error when beginning the scope, it means that we're in shutdown
        // mode.
        result.map_err(|_| Error::Shutdown)
    }

    /// Close the database if it's open
    ///
    /// Pass interrupt=true to interrupt any in-progress queries before closing the database.
    ///
    /// Do not call `close` if you already have a lock on the database in the current thread, as
    /// this will cause a deadlock.
    pub fn close(&self, interrupt: bool) {
        let mut interrupt_handle = self.interrupt_handle.lock();
        if let Some(handle) = interrupt_handle.as_ref() {
            if interrupt {
                handle.interrupt();
            }
            *interrupt_handle = None;
        }
        // Drop the interrupt handle lock to avoid holding both locks at once.
        drop(interrupt_handle);
        *self.connection.lock() = None;
    }

    /// Interrupt any in-progress queries
    pub fn interrupt(&self) {
        if let Some(handle) = self.interrupt_handle.lock().as_ref() {
            handle.interrupt();
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::open_database::test_utils::TestConnectionInitializer;

    fn open_test_db() -> LazyDb<TestConnectionInitializer> {
        LazyDb::new(
            Path::new(":memory:"),
            OpenFlags::default(),
            TestConnectionInitializer::new(),
        )
    }

    #[test]
    fn test_interrupt() {
        let lazy_db = open_test_db();
        let (_, scope) = lazy_db.lock().unwrap();
        assert!(!scope.was_interrupted());
        lazy_db.interrupt();
        assert!(scope.was_interrupted());
    }

    #[test]
    fn interrupt_before_db_is_opened_should_not_fail() {
        let lazy_db = open_test_db();
        lazy_db.interrupt();
    }
}