Commit ec926b0f authored by Sergey Vojtovich's avatar Sergey Vojtovich

Fixed RocksDB to follow THD ha_data protocol

Use thd_get_ha_data()/thd_set_ha_data() which protect against plugin
removal until it has THD ha_data.

Do not reset THD ha_data in rocksdb_close_connection(), cleaner approach
is to let ha_close_connection() do it.

Removed transaction objects cleanup from rocksdb_done_func(). As we lock
plugin properly, there must be no transaction objects during RocksDB
shutdown.

Part of MDEV-19515 - Improve connect speed
parent 8c8d584f
...@@ -3290,9 +3290,9 @@ void Rdb_snapshot_notifier::SnapshotCreated( ...@@ -3290,9 +3290,9 @@ void Rdb_snapshot_notifier::SnapshotCreated(
std::multiset<Rdb_transaction *> Rdb_transaction::s_tx_list; std::multiset<Rdb_transaction *> Rdb_transaction::s_tx_list;
mysql_mutex_t Rdb_transaction::s_tx_list_mutex; mysql_mutex_t Rdb_transaction::s_tx_list_mutex;
static Rdb_transaction *&get_tx_from_thd(THD *const thd) { static Rdb_transaction *get_tx_from_thd(THD *const thd) {
return *reinterpret_cast<Rdb_transaction **>( return reinterpret_cast<Rdb_transaction *>(
my_core::thd_ha_data(thd, rocksdb_hton)); my_core::thd_get_ha_data(thd, rocksdb_hton));
} }
namespace { namespace {
...@@ -3339,7 +3339,7 @@ class Rdb_perf_context_guard { ...@@ -3339,7 +3339,7 @@ class Rdb_perf_context_guard {
*/ */
static Rdb_transaction *get_or_create_tx(THD *const thd) { static Rdb_transaction *get_or_create_tx(THD *const thd) {
Rdb_transaction *&tx = get_tx_from_thd(thd); Rdb_transaction *tx = get_tx_from_thd(thd);
// TODO: this is called too many times.. O(#rows) // TODO: this is called too many times.. O(#rows)
if (tx == nullptr) { if (tx == nullptr) {
bool rpl_skip_tx_api= false; // MARIAROCKS_NOT_YET. bool rpl_skip_tx_api= false; // MARIAROCKS_NOT_YET.
...@@ -3354,6 +3354,7 @@ static Rdb_transaction *get_or_create_tx(THD *const thd) { ...@@ -3354,6 +3354,7 @@ static Rdb_transaction *get_or_create_tx(THD *const thd) {
} }
tx->set_params(THDVAR(thd, lock_wait_timeout), THDVAR(thd, max_row_locks)); tx->set_params(THDVAR(thd, lock_wait_timeout), THDVAR(thd, max_row_locks));
tx->start_tx(); tx->start_tx();
my_core::thd_set_ha_data(thd, rocksdb_hton, tx);
} else { } else {
tx->set_params(THDVAR(thd, lock_wait_timeout), THDVAR(thd, max_row_locks)); tx->set_params(THDVAR(thd, lock_wait_timeout), THDVAR(thd, max_row_locks));
if (!tx->is_tx_started()) { if (!tx->is_tx_started()) {
...@@ -3365,7 +3366,7 @@ static Rdb_transaction *get_or_create_tx(THD *const thd) { ...@@ -3365,7 +3366,7 @@ static Rdb_transaction *get_or_create_tx(THD *const thd) {
} }
static int rocksdb_close_connection(handlerton *const hton, THD *const thd) { static int rocksdb_close_connection(handlerton *const hton, THD *const thd) {
Rdb_transaction *&tx = get_tx_from_thd(thd); Rdb_transaction *tx = get_tx_from_thd(thd);
if (tx != nullptr) { if (tx != nullptr) {
int rc = tx->finish_bulk_load(false); int rc = tx->finish_bulk_load(false);
if (rc != 0) { if (rc != 0) {
...@@ -3376,7 +3377,6 @@ static int rocksdb_close_connection(handlerton *const hton, THD *const thd) { ...@@ -3376,7 +3377,6 @@ static int rocksdb_close_connection(handlerton *const hton, THD *const thd) {
} }
delete tx; delete tx;
tx = nullptr;
} }
return HA_EXIT_SUCCESS; return HA_EXIT_SUCCESS;
} }
...@@ -3444,7 +3444,7 @@ static int rocksdb_prepare(handlerton* hton, THD* thd, bool prepare_tx) ...@@ -3444,7 +3444,7 @@ static int rocksdb_prepare(handlerton* hton, THD* thd, bool prepare_tx)
{ {
bool async=false; // This is "ASYNC_COMMIT" feature which is only present in webscalesql bool async=false; // This is "ASYNC_COMMIT" feature which is only present in webscalesql
Rdb_transaction *&tx = get_tx_from_thd(thd); Rdb_transaction *tx = get_tx_from_thd(thd);
if (!tx->can_prepare()) { if (!tx->can_prepare()) {
return HA_EXIT_FAILURE; return HA_EXIT_FAILURE;
} }
...@@ -3695,7 +3695,7 @@ static void rocksdb_commit_ordered(handlerton *hton, THD* thd, bool all) ...@@ -3695,7 +3695,7 @@ static void rocksdb_commit_ordered(handlerton *hton, THD* thd, bool all)
// Same assert as InnoDB has // Same assert as InnoDB has
DBUG_ASSERT(all || (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | DBUG_ASSERT(all || (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT |
OPTION_BEGIN))); OPTION_BEGIN)));
Rdb_transaction *&tx = get_tx_from_thd(thd); Rdb_transaction *tx = get_tx_from_thd(thd);
if (!tx->is_two_phase()) { if (!tx->is_two_phase()) {
/* /*
ordered_commit is supposedly slower as it is done sequentially ordered_commit is supposedly slower as it is done sequentially
...@@ -3727,7 +3727,7 @@ static int rocksdb_commit(handlerton* hton, THD* thd, bool commit_tx) ...@@ -3727,7 +3727,7 @@ static int rocksdb_commit(handlerton* hton, THD* thd, bool commit_tx)
rocksdb::StopWatchNano timer(rocksdb::Env::Default(), true); rocksdb::StopWatchNano timer(rocksdb::Env::Default(), true);
/* note: h->external_lock(F_UNLCK) is called after this function is called) */ /* note: h->external_lock(F_UNLCK) is called after this function is called) */
Rdb_transaction *&tx = get_tx_from_thd(thd); Rdb_transaction *tx = get_tx_from_thd(thd);
/* this will trigger saving of perf_context information */ /* this will trigger saving of perf_context information */
Rdb_perf_context_guard guard(tx, rocksdb_perf_context_level(thd)); Rdb_perf_context_guard guard(tx, rocksdb_perf_context_level(thd));
...@@ -3800,7 +3800,7 @@ static int rocksdb_commit(handlerton* hton, THD* thd, bool commit_tx) ...@@ -3800,7 +3800,7 @@ static int rocksdb_commit(handlerton* hton, THD* thd, bool commit_tx)
static int rocksdb_rollback(handlerton *const hton, THD *const thd, static int rocksdb_rollback(handlerton *const hton, THD *const thd,
bool rollback_tx) { bool rollback_tx) {
Rdb_transaction *&tx = get_tx_from_thd(thd); Rdb_transaction *tx = get_tx_from_thd(thd);
Rdb_perf_context_guard guard(tx, rocksdb_perf_context_level(thd)); Rdb_perf_context_guard guard(tx, rocksdb_perf_context_level(thd));
if (tx != nullptr) { if (tx != nullptr) {
...@@ -4607,7 +4607,7 @@ static int rocksdb_savepoint(handlerton *const hton, THD *const thd, ...@@ -4607,7 +4607,7 @@ static int rocksdb_savepoint(handlerton *const hton, THD *const thd,
static int rocksdb_rollback_to_savepoint(handlerton *const hton, THD *const thd, static int rocksdb_rollback_to_savepoint(handlerton *const hton, THD *const thd,
void *const savepoint) { void *const savepoint) {
Rdb_transaction *&tx = get_tx_from_thd(thd); Rdb_transaction *tx = get_tx_from_thd(thd);
return tx->rollback_to_savepoint(savepoint); return tx->rollback_to_savepoint(savepoint);
} }
...@@ -5346,49 +5346,6 @@ static int rocksdb_done_func(void *const p) { ...@@ -5346,49 +5346,6 @@ static int rocksdb_done_func(void *const p) {
error = 1; error = 1;
} }
/*
MariaDB: When the plugin is unloaded with UNINSTALL SONAME command, some
connections may still have Rdb_transaction objects.
These objects are not genuine transactions (as SQL layer makes sure that
a plugin that is being unloaded has no open tables), they are empty
Rdb_transaction objects that were left there to save on object
creation/deletion.
Go through the list and delete them.
*/
{
class Rdb_trx_deleter: public Rdb_tx_list_walker {
public:
std::set<Rdb_transaction*> rdb_trxs;
void process_tran(const Rdb_transaction *const tx) override {
/*
Check if the transaction is really empty. We only check
non-WriteBatch-based transactions, because there is no easy way to
check WriteBatch-based transactions.
*/
if (!tx->is_writebatch_trx()) {
const auto tx_impl = static_cast<const Rdb_transaction_impl *>(tx);
DBUG_ASSERT(tx_impl);
if (tx_impl->get_rdb_trx())
DBUG_ASSERT(0);
}
rdb_trxs.insert((Rdb_transaction*)tx);
};
} deleter;
Rdb_transaction::walk_tx_list(&deleter);
for (std::set<Rdb_transaction*>::iterator it= deleter.rdb_trxs.begin();
it != deleter.rdb_trxs.end();
++it)
{
// When a transaction is deleted, it removes itself from s_tx_list.
delete *it;
}
}
/* /*
destructors for static objects can be called at _exit(), destructors for static objects can be called at _exit(),
but we want to free the memory at dlclose() but we want to free the memory at dlclose()
...@@ -13838,7 +13795,7 @@ int rocksdb_check_bulk_load( ...@@ -13838,7 +13795,7 @@ int rocksdb_check_bulk_load(
return 1; return 1;
} }
Rdb_transaction *&tx = get_tx_from_thd(thd); Rdb_transaction *tx = get_tx_from_thd(thd);
if (tx != nullptr) { if (tx != nullptr) {
const int rc = tx->finish_bulk_load(); const int rc = tx->finish_bulk_load();
if (rc != 0) { if (rc != 0) {
......
...@@ -2,14 +2,18 @@ ...@@ -2,14 +2,18 @@
# MDEV-14843: Assertion `s_tx_list.size() == 0' failed in myrocks::Rdb_transaction::term_mutex # MDEV-14843: Assertion `s_tx_list.size() == 0' failed in myrocks::Rdb_transaction::term_mutex
# #
INSTALL SONAME 'ha_rocksdb'; INSTALL SONAME 'ha_rocksdb';
connect con1,localhost,root,,test;
CREATE TABLE t1 (i INT) ENGINE=RocksDB; CREATE TABLE t1 (i INT) ENGINE=RocksDB;
insert into t1 values (1); insert into t1 values (1);
connect con1,localhost,root,,;
connection con1;
insert into test.t1 values (1);
connection default;
DROP TABLE t1; DROP TABLE t1;
connection default;
UNINSTALL SONAME 'ha_rocksdb'; UNINSTALL SONAME 'ha_rocksdb';
Warnings:
Warning 1620 Plugin is busy and will be uninstalled on shutdown
SELECT ENGINE, SUPPORT FROM INFORMATION_SCHEMA.ENGINES WHERE ENGINE='ROCKSDB';
ENGINE SUPPORT
ROCKSDB NO
disconnect con1;
# #
# MDEV-15686: Loading MyRocks plugin back after it has been unloaded causes a crash # MDEV-15686: Loading MyRocks plugin back after it has been unloaded causes a crash
# #
......
--source include/have_log_bin.inc --source include/have_log_bin.inc
--source include/have_binlog_format_row.inc --source include/have_binlog_format_row.inc
--source include/not_windows.inc
--echo # --echo #
--echo # MDEV-14843: Assertion `s_tx_list.size() == 0' failed in myrocks::Rdb_transaction::term_mutex --echo # MDEV-14843: Assertion `s_tx_list.size() == 0' failed in myrocks::Rdb_transaction::term_mutex
...@@ -14,18 +15,21 @@ ...@@ -14,18 +15,21 @@
INSTALL SONAME 'ha_rocksdb'; INSTALL SONAME 'ha_rocksdb';
--enable_warnings --enable_warnings
connect (con1,localhost,root,,test);
CREATE TABLE t1 (i INT) ENGINE=RocksDB; CREATE TABLE t1 (i INT) ENGINE=RocksDB;
insert into t1 values (1); insert into t1 values (1);
DROP TABLE t1;
connect (con1,localhost,root,,);
connection con1;
insert into test.t1 values (1);
connection default; connection default;
# Cleanup # Cleanup
DROP TABLE t1;
UNINSTALL SONAME 'ha_rocksdb'; UNINSTALL SONAME 'ha_rocksdb';
SELECT ENGINE, SUPPORT FROM INFORMATION_SCHEMA.ENGINES WHERE ENGINE='ROCKSDB';
disconnect con1;
# Unfortunately this is the only more or less reliable way to wait until
# connection done ha_close_connections(). It doesn't work on Windows due
# to different thread handling.
let $wait_condition= SELECT VARIABLE_VALUE=1 FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME='Threads_cached';
--source include/wait_condition.inc
--echo # --echo #
--echo # MDEV-15686: Loading MyRocks plugin back after it has been unloaded causes a crash --echo # MDEV-15686: Loading MyRocks plugin back after it has been unloaded causes a crash
......
...@@ -52,8 +52,8 @@ namespace myrocks { ...@@ -52,8 +52,8 @@ namespace myrocks {
Since we cannot or don't want to change the API in any way, we can use this Since we cannot or don't want to change the API in any way, we can use this
mechanism to define readability tokens that look like C++ namespaces, but are mechanism to define readability tokens that look like C++ namespaces, but are
not enforced in any way by the compiler, since the pre-compiler strips them not enforced in any way by the compiler, since the pre-compiler strips them
out. However, on the calling side, code looks like my_core::thd_ha_data() out. However, on the calling side, code looks like my_core::thd_get_ha_data()
rather than plain a thd_ha_data() call. This technique adds an immediate rather than plain a thd_get_ha_data() call. This technique adds an immediate
visible cue on what type of API we are calling into. visible cue on what type of API we are calling into.
*/ */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment