Skip to content

Commit

Permalink
rgw/sfs: dbconn: add connection pool
Browse files Browse the repository at this point in the history
Currently, `DBConn` keeps an instance of `Storage`, which is created by
`sqlite_orm::make_storage()`.  That first instance of `Storage` is long-lived
(the `DBConn` c'tor calls `storage->open_forever()`) so the sqlite database
is open for the entire life of the program, but this first `Storage` object
and its associated sqlite database connection pointer are largely not used
for anything much after initialization. The exception is the SFS status page,
which also uses this connection to report some sqlite statistics.

All the other threads (the workers, the garbage collector, ...) call
`DBConn::get_storage()`, which returns a copy of the first `Storage` object.
These copies don't have `open_forever()` called on them, which means every
time they're used for queries we get a pair of calls to `sqlite3_open()` and
`sqlite3_close()` at the start end end of the query.  These calls don't open
the main database file again (it's already open) but they do open and close
the WAL.

There's a couple of problems with this.  One is that the SFS status page
only sees the main thread (which is largely boring), and can't track any
of the worker threads.  The other problem is that something about not keeping
the connection open on the worker threads is relatively expensive.  If we
keep connections open rather than opening and closing with every query, we
can get something like a 20x speed increase on read queries, and at least 2x
on writes.

This new implementation gives one `Storage` object per thread, created on
demand as a copy of the first `Storage` object created in the `DBConn`
constructor.

Fixes: https://github.com/aquarist-labs/s3gw/issues/727
Signed-off-by: Tim Serong <[email protected]>
  • Loading branch information
tserong committed Oct 19, 2023
1 parent 8ddd3b6 commit 1ab688e
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 5 deletions.
30 changes: 29 additions & 1 deletion src/rgw/driver/sfs/sqlite/dbconn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,16 @@ static int sqlite_profile_callback(
}

DBConn::DBConn(CephContext* _cct)
: storage(_make_storage(getDBPath(_cct))),
: main_thread(std::this_thread::get_id()),
storage_pool_mutex(),
first_sqlite_conn(nullptr),
cct(_cct),
profile_enabled(_cct->_conf.get_val<bool>("rgw_sfs_sqlite_profile")) {
sqlite3_config(SQLITE_CONFIG_LOG, &sqlite_error_callback, cct);
// get_storage() relies on there already being an entry in the pool
// for the main thread (i.e. the thread that created the DBConn).
storage_pool.emplace(main_thread, _make_storage(getDBPath(cct)));
StorageRef storage = get_storage();
storage->on_open = [this](sqlite3* db) {
if (first_sqlite_conn == nullptr) {
first_sqlite_conn = db;
Expand Down Expand Up @@ -156,6 +161,28 @@ DBConn::DBConn(CephContext* _cct)
storage->sync_schema();
}

StorageRef DBConn::get_storage() {
std::thread::id this_thread = std::this_thread::get_id();
try {
std::shared_lock lock(storage_pool_mutex);
return &storage_pool.at(this_thread);
} catch (const std::out_of_range& ex) {
std::unique_lock lock(storage_pool_mutex);
auto [it, _] =
storage_pool.emplace(this_thread, storage_pool.at(main_thread));
StorageRef storage = &(*it).second;
// A copy of the main thread's Storage object won't have an open DB
// connection yet, so we'd better make it have one (otherwise we're
// back to a gadzillion sqlite3_open()/sqlite3_close() calls again)
storage->open_forever();
storage->busy_timeout(5000);
lsubdout(cct, rgw, 10) << "[SQLITE CONNECTION NEW] Added Storage "
<< storage << " to pool for thread " << std::hex
<< this_thread << std::dec << dendl;
return storage;
}
}

void DBConn::check_metadata_is_compatible() const {
bool sync_error = false;
std::string result_message;
Expand Down Expand Up @@ -359,6 +386,7 @@ static void upgrade_metadata(
}

void DBConn::maybe_upgrade_metadata() {
StorageRef storage = get_storage();
int db_version = get_version(cct, storage);
lsubdout(cct, rgw, 10) << "db user version: " << db_version << dendl;

Expand Down
11 changes: 7 additions & 4 deletions src/rgw/driver/sfs/sqlite/dbconn.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <filesystem>
#include <ios>
#include <memory>
#include <shared_mutex>

#include "buckets/bucket_definitions.h"
#include "buckets/multipart_definitions.h"
Expand Down Expand Up @@ -251,12 +252,14 @@ inline auto _make_storage(const std::string& path) {
);
}

using StorageImpl = decltype(_make_storage(""));
using StorageRef = StorageImpl*;
using Storage = decltype(_make_storage(""));
using StorageRef = Storage*;

class DBConn {
private:
StorageImpl storage;
std::unordered_map<std::thread::id, Storage> storage_pool;
std::thread::id main_thread;
mutable std::shared_mutex storage_pool_mutex;

public:
sqlite3* first_sqlite_conn;
Expand All @@ -269,7 +272,7 @@ class DBConn {
DBConn(const DBConn&) = delete;
DBConn& operator=(const DBConn&) = delete;

inline auto get_storage() { return &storage; }
StorageRef get_storage();

static std::string getDBPath(CephContext* cct) {
auto rgw_sfs_path = cct->_conf.get_val<std::string>("rgw_sfs_data_path");
Expand Down

0 comments on commit 1ab688e

Please sign in to comment.