Commit e493c6bb authored by Kristian Nielsen's avatar Kristian Nielsen

Merge remote-tracking branch 'my/tokudb_optimistic_parallel_replication' into 10.1

parents a68d1352 3bec0b32
......@@ -405,6 +405,7 @@ static void print_db_env_struct (void) {
"int (*set_lock_timeout) (DB_ENV *env, uint64_t default_lock_wait_time_msec, uint64_t (*get_lock_wait_time_cb)(uint64_t default_lock_wait_time))",
"int (*get_lock_timeout) (DB_ENV *env, uint64_t *lock_wait_time_msec)",
"int (*set_lock_timeout_callback) (DB_ENV *env, lock_timeout_callback callback)",
"int (*set_lock_wait_callback) (DB_ENV *env, lock_wait_callback callback)",
"int (*txn_xa_recover) (DB_ENV*, TOKU_XA_XID list[/*count*/], long count, /*out*/ long *retp, uint32_t flags)",
"int (*get_txn_from_xid) (DB_ENV*, /*in*/ TOKU_XA_XID *, /*out*/ DB_TXN **)",
"DB* (*get_db_for_directory) (DB_ENV*)",
......@@ -425,6 +426,7 @@ static void print_db_env_struct (void) {
"bool (*set_dir_per_db)(DB_ENV *, bool new_val)",
"bool (*get_dir_per_db)(DB_ENV *)",
"const char *(*get_data_dir)(DB_ENV *env)",
"void (*kill_waiter)(DB_ENV *, void *extra)",
NULL};
sort_and_dump_fields("db_env", true, extra);
......@@ -545,8 +547,8 @@ static void print_db_txn_struct (void) {
"int (*abort_with_progress)(DB_TXN*, TXN_PROGRESS_POLL_FUNCTION, void*)",
"int (*xa_prepare) (DB_TXN*, TOKU_XA_XID *, uint32_t flags)",
"uint64_t (*id64) (DB_TXN*)",
"void (*set_client_id)(DB_TXN *, uint64_t client_id)",
"uint64_t (*get_client_id)(DB_TXN *)",
"void (*set_client_id)(DB_TXN *, uint64_t client_id, void *client_extra)",
"void (*get_client_id)(DB_TXN *, uint64_t *client_id, void **client_extra)",
"bool (*is_prepared)(DB_TXN *)",
"DB_TXN *(*get_child)(DB_TXN *)",
"uint64_t (*get_start_time)(DB_TXN *)",
......@@ -750,6 +752,7 @@ int main (int argc, char *const argv[] __attribute__((__unused__))) {
printf("void toku_dbt_array_resize(DBT_ARRAY *dbts, uint32_t size) %s;\n", VISIBLE);
printf("typedef void (*lock_timeout_callback)(DB *db, uint64_t requesting_txnid, const DBT *left_key, const DBT *right_key, uint64_t blocking_txnid);\n");
printf("typedef void (*lock_wait_callback)(void *arg, uint64_t requesting_txnid, uint64_t blocking_txnid);\n");
printf("typedef int (*iterate_row_locks_callback)(DB **db, DBT *left_key, DBT *right_key, void *extra);\n");
printf("typedef int (*iterate_transactions_callback)(DB_TXN *dbtxn, iterate_row_locks_callback cb, void *locks_extra, void *extra);\n");
printf("typedef int (*iterate_requests_callback)(DB *db, uint64_t requesting_txnid, const DBT *left_key, const DBT *right_key, uint64_t blocking_txnid, uint64_t start_time, void *extra);\n");
......
......@@ -269,6 +269,7 @@ static txn_child_manager tcm;
.state = TOKUTXN_LIVE,
.num_pin = 0,
.client_id = 0,
.client_extra = nullptr,
.start_time = time(NULL),
};
......@@ -705,12 +706,14 @@ bool toku_txn_has_spilled_rollback(TOKUTXN txn) {
return txn_has_spilled_rollback_logs(txn);
}
uint64_t toku_txn_get_client_id(TOKUTXN txn) {
return txn->client_id;
void toku_txn_get_client_id(TOKUTXN txn, uint64_t *client_id, void **client_extra) {
*client_id = txn->client_id;
*client_extra = txn->client_extra;
}
void toku_txn_set_client_id(TOKUTXN txn, uint64_t client_id) {
void toku_txn_set_client_id(TOKUTXN txn, uint64_t client_id, void *client_extra) {
txn->client_id = client_id;
txn->client_extra = client_extra;
}
time_t toku_txn_get_start_time(struct tokutxn *txn) {
......
......@@ -193,6 +193,7 @@ struct tokutxn {
uint32_t num_pin; // number of threads (all hot indexes) that want this
// txn to not transition to commit or abort
uint64_t client_id;
void *client_extra;
time_t start_time;
};
typedef struct tokutxn *TOKUTXN;
......@@ -293,8 +294,8 @@ void toku_txn_unpin_live_txn(struct tokutxn *txn);
bool toku_txn_has_spilled_rollback(struct tokutxn *txn);
uint64_t toku_txn_get_client_id(struct tokutxn *txn);
void toku_txn_set_client_id(struct tokutxn *txn, uint64_t client_id);
void toku_txn_get_client_id(struct tokutxn *txn, uint64_t *client_id, void **client_extra);
void toku_txn_set_client_id(struct tokutxn *txn, uint64_t client_id, void *client_extra);
time_t toku_txn_get_start_time(struct tokutxn *txn);
......
......@@ -202,6 +202,7 @@ namespace ftcxx {
typedef uint64_t (*get_lock_wait_time_cb_func)(uint64_t);
get_lock_wait_time_cb_func _get_lock_wait_time_cb;
lock_timeout_callback _lock_timeout_callback;
lock_wait_callback _lock_wait_needed_callback;
uint64_t (*_loader_memory_size_callback)(void);
uint32_t _cachesize_gbytes;
......@@ -231,6 +232,7 @@ namespace ftcxx {
_lock_wait_time_msec(0),
_get_lock_wait_time_cb(nullptr),
_lock_timeout_callback(nullptr),
_lock_wait_needed_callback(nullptr),
_loader_memory_size_callback(nullptr),
_cachesize_gbytes(0),
_cachesize_bytes(0),
......@@ -296,6 +298,11 @@ namespace ftcxx {
handle_ft_retval(r);
}
if (_lock_wait_needed_callback) {
r = env->set_lock_wait_callback(env, _lock_wait_needed_callback);
handle_ft_retval(r);
}
if (_loader_memory_size_callback) {
env->set_loader_memory_size(env, _loader_memory_size_callback);
}
......@@ -419,6 +426,11 @@ namespace ftcxx {
return *this;
}
DBEnvBuilder& set_lock_wait_callback(lock_wait_callback callback) {
_lock_wait_needed_callback = callback;
return *this;
}
DBEnvBuilder& set_loader_memory_size(uint64_t (*callback)(void)) {
_loader_memory_size_callback = callback;
return *this;
......
......@@ -65,6 +65,7 @@ void lock_request::create(void) {
toku_cond_init(&m_wait_cond, nullptr);
m_start_test_callback = nullptr;
m_start_before_pending_test_callback = nullptr;
m_retry_test_callback = nullptr;
}
......@@ -79,7 +80,7 @@ void lock_request::destroy(void) {
}
// set the lock request parameters. this API allows a lock request to be reused.
void lock_request::set(locktree *lt, TXNID txnid, const DBT *left_key, const DBT *right_key, lock_request::type lock_type, bool big_txn) {
void lock_request::set(locktree *lt, TXNID txnid, const DBT *left_key, const DBT *right_key, lock_request::type lock_type, bool big_txn, void *extra) {
invariant(m_state != state::PENDING);
m_lt = lt;
m_txnid = txnid;
......@@ -91,6 +92,7 @@ void lock_request::set(locktree *lt, TXNID txnid, const DBT *left_key, const DBT
m_state = state::INITIALIZED;
m_info = lt ? lt->get_lock_request_info() : nullptr;
m_big_txn = big_txn;
m_extra = extra;
}
// get rid of any stored left and right key copies and
......@@ -173,6 +175,7 @@ int lock_request::start(void) {
m_state = state::PENDING;
m_start_time = toku_current_time_microsec() / 1000;
m_conflicting_txnid = conflicts.get(0);
if (m_start_before_pending_test_callback) m_start_before_pending_test_callback();
toku_mutex_lock(&m_info->mutex);
insert_into_lock_requests();
if (deadlock_exists(conflicts)) {
......@@ -196,14 +199,32 @@ int lock_request::wait(uint64_t wait_time_ms) {
return wait(wait_time_ms, 0, nullptr);
}
int lock_request::wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*killed_callback)(void)) {
int lock_request::wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*killed_callback)(void),
void (*lock_wait_callback)(void *, TXNID, TXNID)) {
uint64_t t_now = toku_current_time_microsec();
uint64_t t_start = t_now;
uint64_t t_end = t_start + wait_time_ms * 1000;
toku_mutex_lock(&m_info->mutex);
// check again, this time locking out other retry calls
if (m_state == state::PENDING) {
GrowableArray<TXNID> conflicts_collector;
conflicts_collector.init();
retry(&conflicts_collector);
if (m_state == state::PENDING) {
report_waits(&conflicts_collector, lock_wait_callback);
}
conflicts_collector.deinit();
}
while (m_state == state::PENDING) {
// check if this thread is killed
if (killed_callback && killed_callback()) {
remove_from_lock_requests();
complete(DB_LOCK_NOTGRANTED);
continue;
}
// compute next wait time
uint64_t t_wait;
......@@ -221,7 +242,7 @@ int lock_request::wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*kil
invariant(r == 0 || r == ETIMEDOUT);
t_now = toku_current_time_microsec();
if (m_state == state::PENDING && (t_now >= t_end || (killed_callback && killed_callback()))) {
if (m_state == state::PENDING && t_now >= t_end) {
m_info->counters.timeout_count += 1;
// if we're still pending and we timed out, then remove our
......@@ -273,14 +294,16 @@ TXNID lock_request::get_conflicting_txnid(void) const {
return m_conflicting_txnid;
}
int lock_request::retry(void) {
int lock_request::retry(GrowableArray<TXNID> *conflicts_collector) {
invariant(m_state == state::PENDING);
int r;
invariant(m_state == state::PENDING);
txnid_set conflicts;
conflicts.create();
if (m_type == type::WRITE) {
r = m_lt->acquire_write_lock(m_txnid, m_left_key, m_right_key, nullptr, m_big_txn);
r = m_lt->acquire_write_lock(m_txnid, m_left_key, m_right_key, &conflicts, m_big_txn);
} else {
r = m_lt->acquire_read_lock(m_txnid, m_left_key, m_right_key, nullptr, m_big_txn);
r = m_lt->acquire_read_lock(m_txnid, m_left_key, m_right_key, &conflicts, m_big_txn);
}
// if the acquisition succeeded then remove ourselves from the
......@@ -290,59 +313,105 @@ int lock_request::retry(void) {
complete(r);
if (m_retry_test_callback) m_retry_test_callback(); // test callback
toku_cond_broadcast(&m_wait_cond);
} else {
m_conflicting_txnid = conflicts.get(0);
add_conflicts_to_waits(&conflicts, conflicts_collector);
}
conflicts.destroy();
return r;
}
void lock_request::retry_all_lock_requests(locktree *lt) {
void lock_request::retry_all_lock_requests(locktree *lt, void (*lock_wait_callback)(void *, TXNID, TXNID), void (*after_retry_all_test_callback)(void)) {
lt_lock_request_info *info = lt->get_lock_request_info();
// if a thread reads this bit to be true, then it should go ahead and
// take the locktree mutex and retry lock requests. we use this bit
// to prevent every single thread from waiting on the locktree mutex
// in order to retry requests, especially when no requests actually exist.
//
// it is important to note that this bit only provides an optimization.
// it is not problematic for it to be true when it should be false,
// but it can be problematic for it to be false when it should be true.
// therefore, the lock request code must ensures that when lock requests
// are added to this locktree, the bit is set.
// see lock_request::insert_into_lock_requests()
if (!info->should_retry_lock_requests) {
info->retry_want++;
// if there are no pending lock requests than there is nothing to do
// the unlocked data race on pending_is_empty is OK since lock requests
// are retried after added to the pending set.
if (info->pending_is_empty)
return;
}
toku_mutex_lock(&info->mutex);
// let other threads know that they need not retry lock requests at this time.
//
// the motivation here is that if a bunch of threads have already released
// their locks in the rangetree, then its probably okay for only one thread
// to iterate over the list of requests and retry them. otherwise, at high
// thread counts and a large number of pending lock requests, you could
// end up wasting a lot of cycles.
info->should_retry_lock_requests = false;
size_t i = 0;
while (i < info->pending_lock_requests.size()) {
GrowableArray<TXNID> conflicts_collector;
conflicts_collector.init();
// here is the group retry algorithm.
// get the latest retry_want count and use it as the generation number of this retry operation.
// if this retry generation is > the last retry generation, then do the lock retries. otherwise,
// no lock retries are needed.
unsigned long long retry_gen = info->retry_want.load();
if (retry_gen > info->retry_done) {
// retry all of the pending lock requests.
for (size_t i = 0; i < info->pending_lock_requests.size(); ) {
lock_request *request;
int r = info->pending_lock_requests.fetch(i, &request);
invariant_zero(r);
// retry the lock request. if it didn't succeed,
// retry this lock request. if it didn't succeed,
// move on to the next lock request. otherwise
// the request is gone from the list so we may
// read the i'th entry for the next one.
r = request->retry();
r = request->retry(&conflicts_collector);
if (r != 0) {
i++;
}
}
if (after_retry_all_test_callback) after_retry_all_test_callback();
info->retry_done = retry_gen;
}
toku_mutex_unlock(&info->mutex);
// future threads should only retry lock requests if some still exist
info->should_retry_lock_requests = info->pending_lock_requests.size() > 0;
report_waits(&conflicts_collector, lock_wait_callback);
conflicts_collector.deinit();
}
void lock_request::add_conflicts_to_waits(txnid_set *conflicts,
GrowableArray<TXNID> *wait_conflicts) {
size_t num_conflicts = conflicts->size();
for (size_t i = 0; i < num_conflicts; i++) {
wait_conflicts->push(m_txnid);
wait_conflicts->push(conflicts->get(i));
}
}
void lock_request::report_waits(GrowableArray<TXNID> *wait_conflicts,
void (*lock_wait_callback)(void *, TXNID, TXNID)) {
if (!lock_wait_callback)
return;
size_t num_conflicts = wait_conflicts->get_size();
for (size_t i = 0; i < num_conflicts; i += 2) {
TXNID blocked_txnid = wait_conflicts->fetch_unchecked(i);
TXNID blocking_txnid = wait_conflicts->fetch_unchecked(i+1);
(*lock_wait_callback)(nullptr, blocked_txnid, blocking_txnid);
}
}
void *lock_request::get_extra(void) const {
return m_extra;
}
void lock_request::kill_waiter(void) {
remove_from_lock_requests();
complete(DB_LOCK_NOTGRANTED);
toku_cond_broadcast(&m_wait_cond);
}
void lock_request::kill_waiter(locktree *lt, void *extra) {
lt_lock_request_info *info = lt->get_lock_request_info();
toku_mutex_lock(&info->mutex);
for (size_t i = 0; i < info->pending_lock_requests.size(); i++) {
lock_request *request;
int r = info->pending_lock_requests.fetch(i, &request);
if (r == 0 && request->get_extra() == extra) {
request->kill_waiter();
break;
}
}
toku_mutex_unlock(&info->mutex);
}
......@@ -364,9 +433,7 @@ void lock_request::insert_into_lock_requests(void) {
invariant(r == DB_NOTFOUND);
r = m_info->pending_lock_requests.insert_at(this, idx);
invariant_zero(r);
// ensure that this bit is true, now that at least one lock request is in the set
m_info->should_retry_lock_requests = true;
m_info->pending_is_empty = false;
}
// remove this lock request from the locktree's set. must hold the mutex.
......@@ -378,6 +445,8 @@ void lock_request::remove_from_lock_requests(void) {
invariant(request == this);
r = m_info->pending_lock_requests.delete_at(idx);
invariant_zero(r);
if (m_info->pending_lock_requests.size() == 0)
m_info->pending_is_empty = true;
}
int lock_request::find_by_txnid(lock_request * const &request, const TXNID &txnid) {
......@@ -395,6 +464,10 @@ void lock_request::set_start_test_callback(void (*f)(void)) {
m_start_test_callback = f;
}
void lock_request::set_start_before_pending_test_callback(void (*f)(void)) {
m_start_before_pending_test_callback = f;
}
void lock_request::set_retry_test_callback(void (*f)(void)) {
m_retry_test_callback = f;
}
......
......@@ -78,7 +78,7 @@ class lock_request {
// effect: Resets the lock request parameters, allowing it to be reused.
// requires: Lock request was already created at some point
void set(locktree *lt, TXNID txnid, const DBT *left_key, const DBT *right_key, type lock_type, bool big_txn);
void set(locktree *lt, TXNID txnid, const DBT *left_key, const DBT *right_key, type lock_type, bool big_txn, void *extra = nullptr);
// effect: Tries to acquire a lock described by this lock request.
// returns: The return code of locktree::acquire_[write,read]_lock()
......@@ -89,7 +89,8 @@ class lock_request {
// returns: The return code of locktree::acquire_[write,read]_lock()
// or simply DB_LOCK_NOTGRANTED if the wait time expired.
int wait(uint64_t wait_time_ms);
int wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*killed_callback)(void));
int wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*killed_callback)(void),
void (*lock_wait_callback)(void *, TXNID, TXNID) = nullptr);
// return: left end-point of the lock range
const DBT *get_left_key(void) const;
......@@ -109,12 +110,18 @@ class lock_request {
// effect: Retries all of the lock requests for the given locktree.
// Any lock requests successfully restarted is completed and woken up.
// The rest remain pending.
static void retry_all_lock_requests(locktree *lt);
static void retry_all_lock_requests(locktree *lt, void (*lock_wait_callback)(void *, TXNID, TXNID) = nullptr, void (*after_retry_test_callback)(void) = nullptr);
void set_start_test_callback(void (*f)(void));
void set_start_before_pending_test_callback(void (*f)(void));
void set_retry_test_callback(void (*f)(void));
private:
void *get_extra(void) const;
void kill_waiter(void);
static void kill_waiter(locktree *lt, void *extra);
private:
enum state {
UNINITIALIZED,
INITIALIZED,
......@@ -152,9 +159,11 @@ class lock_request {
// locktree that this lock request is for.
struct lt_lock_request_info *m_info;
void *m_extra;
// effect: tries again to acquire the lock described by this lock request
// returns: 0 if retrying the request succeeded and is now complete
int retry(void);
int retry(GrowableArray<TXNID> *conflict_collector);
void complete(int complete_r);
......@@ -186,7 +195,13 @@ class lock_request {
static int find_by_txnid(lock_request * const &request, const TXNID &txnid);
// Report list of conflicts to lock wait callback.
static void report_waits(GrowableArray<TXNID> *wait_conflicts,
void (*lock_wait_callback)(void *, TXNID, TXNID));
void add_conflicts_to_waits(txnid_set *conflicts, GrowableArray<TXNID> *wait_conflicts);
void (*m_start_test_callback)(void);
void (*m_start_before_pending_test_callback)(void);
void (*m_retry_test_callback)(void);
friend class lock_request_unit_test;
......
......@@ -81,20 +81,14 @@ void locktree::create(locktree_manager *mgr, DICTIONARY_ID dict_id, const compar
m_sto_end_early_time = 0;
m_lock_request_info.pending_lock_requests.create();
m_lock_request_info.pending_is_empty = true;
ZERO_STRUCT(m_lock_request_info.mutex);
toku_mutex_init(&m_lock_request_info.mutex, nullptr);
m_lock_request_info.should_retry_lock_requests = false;
m_lock_request_info.retry_want = m_lock_request_info.retry_done = 0;
ZERO_STRUCT(m_lock_request_info.counters);
// Threads read the should retry bit without a lock
// for performance. It's ok to read the wrong value.
// - If you think you should but you shouldn't, you waste a little time.
// - If you think you shouldn't but you should, then some other thread
// will come around to do the work of retrying requests instead of you.
TOKU_VALGRIND_HG_DISABLE_CHECKING(
&m_lock_request_info.should_retry_lock_requests,
sizeof(m_lock_request_info.should_retry_lock_requests));
TOKU_DRD_IGNORE_VAR(m_lock_request_info.should_retry_lock_requests);
TOKU_VALGRIND_HG_DISABLE_CHECKING(&m_lock_request_info.pending_is_empty, sizeof(m_lock_request_info.pending_is_empty));
TOKU_DRD_IGNORE_VAR(m_lock_request_info.pending_is_empty);
}
void locktree::destroy(void) {
......
......@@ -38,6 +38,8 @@ Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
#pragma once
#include <atomic>
#include <db.h>
#include <toku_time.h>
#include <toku_pthread.h>
......@@ -80,9 +82,11 @@ namespace toku {
// Lock request state for some locktree
struct lt_lock_request_info {
omt<lock_request *> pending_lock_requests;
std::atomic_bool pending_is_empty;
toku_mutex_t mutex;
bool should_retry_lock_requests;
lt_counters counters;
std::atomic_ullong retry_want;
unsigned long long retry_done;
};
// The locktree manager manages a set of locktrees, one for each open dictionary.
......@@ -159,6 +163,8 @@ namespace toku {
// Add time t to the escalator's wait time statistics
void add_escalator_wait_time(uint64_t t);
void kill_waiter(void *extra);
private:
static const uint64_t DEFAULT_MAX_LOCK_MEMORY = 64L * 1024 * 1024;
......
......@@ -483,4 +483,17 @@ void locktree_manager::get_status(LTM_STATUS statp) {
*statp = ltm_status;
}
void locktree_manager::kill_waiter(void *extra) {
mutex_lock();
int r = 0;
size_t num_locktrees = m_locktree_map.size();
for (size_t i = 0; i < num_locktrees; i++) {
locktree *lt;
r = m_locktree_map.fetch(i, &lt);
invariant_zero(r);
lock_request::kill_waiter(lt, extra);
}
mutex_unlock();
}
} /* namespace toku */
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
// test the lock manager kill waiter function
#include "locktree.h"
#include "lock_request.h"
#include "test.h"
#include "locktree_unit_test.h"
#include <thread>
#include <atomic>
namespace toku {
const uint64_t my_lock_wait_time = 1000 * 1000;
const uint64_t my_killed_time = 500 * 1000;
const int n_locks = 4;
static int my_killed_callback(void) {
if (1) fprintf(stderr, "%s:%u %s\n", __FILE__, __LINE__, __FUNCTION__);
return 0;
}
static void locktree_release_lock(locktree *lt, TXNID txn_id, const DBT *left, const DBT *right) {
range_buffer buffer;
buffer.create();
buffer.append(left, right);
lt->release_locks(txn_id, &buffer);
buffer.destroy();
}
static void wait_lock(lock_request *lr, std::atomic_int *done) {
int r = lr->wait(my_lock_wait_time, my_killed_time, my_killed_callback);
assert(r == DB_LOCK_NOTGRANTED);
*done = 1;
}
static void test_kill_waiter(void) {
int r;
locktree_manager mgr;
mgr.create(nullptr, nullptr, nullptr, nullptr);
DICTIONARY_ID dict_id = { 1 };
locktree *lt = mgr.get_lt(dict_id, dbt_comparator, nullptr);
const DBT *one = get_dbt(1);
lock_request locks[n_locks];
std::thread waiters[n_locks-1];
for (int i = 0; i < n_locks; i++) {
locks[i].create();
locks[i].set(lt, i+1, one, one, lock_request::type::WRITE, false, &waiters[i]);
}
// txn 'n_locks' grabs the lock
r = locks[n_locks-1].start();
assert_zero(r);
for (int i = 0; i < n_locks-1; i++) {
r = locks[i].start();
assert(r == DB_LOCK_NOTGRANTED);
}
std::atomic_int done[n_locks-1];
for (int i = 0; i < n_locks-1; i++) {
done[i] = 0;
waiters[i] = std::thread(wait_lock, &locks[i], &done[i]);
}
for (int i = 0; i < n_locks-1; i++) {
assert(!done[i]);
}
sleep(1);
for (int i = 0; i < n_locks-1; i++) {
mgr.kill_waiter(&waiters[i]);
while (!done[i]) sleep(1);
waiters[i].join();
for (int j = i+1; j < n_locks-1; j++)
assert(!done[j]);
}
locktree_release_lock(lt, n_locks, one, one);
for (int i = 0; i < n_locks; i++) {
locks[i].destroy();
}
mgr.release_lt(lt);
mgr.destroy();
}
} /* namespace toku */
int main(void) {
toku::test_kill_waiter();
return 0;
}
......@@ -51,8 +51,9 @@ static uint64_t t_do_kill;
static int my_killed_callback(void) {
uint64_t t_now = toku_current_time_microsec();
if (t_now == t_last_kill)
return 0;
assert(t_now >= t_last_kill);
assert(t_now - t_last_kill >= my_killed_time * 1000 / 2); // div by 2 for valgrind which is not very accurate
t_last_kill = t_now;
killed_calls++;
if (t_now >= t_do_kill)
......
......@@ -52,7 +52,6 @@ static uint64_t t_last_kill;
static int my_killed_callback(void) {
uint64_t t_now = toku_current_time_microsec();
assert(t_now >= t_last_kill);
assert(t_now - t_last_kill >= my_killed_time * 1000 / 2); // div by 2 for valgrind which is not very accurate
t_last_kill = t_now;
killed_calls++;
return 0;
......
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
// test the race between start, release, and wait. since start does not put its
// lock request into the pending set, the blocking txn could release its lock before
// the first txn waits. this will block the first txn because its lock request is
// not known when the lock is released. the bug fix is to try again when lock retries
// are locked out.
#include "locktree.h"
#include "lock_request.h"
#include "test.h"
#include "locktree_unit_test.h"
#include <thread>
#include <atomic>
namespace toku {
const uint64_t my_lock_wait_time = 1000 * 1000; // ms
const uint64_t my_killed_time = 1 * 1000; // ms
static uint64_t t_wait;
static int my_killed_callback(void) {
uint64_t t_now = toku_current_time_microsec();
assert(t_now >= t_wait);
if (t_now - t_wait >= my_killed_time*1000)
abort();
return 0;
}
static void locktree_release_lock(locktree *lt, TXNID txn_id, const DBT *left, const DBT *right) {
range_buffer buffer;
buffer.create();
buffer.append(left, right);
lt->release_locks(txn_id, &buffer);
buffer.destroy();
}
static void test_start_release_wait(void) {
int r;
locktree_manager mgr;
mgr.create(nullptr, nullptr, nullptr, nullptr);
DICTIONARY_ID dict_id = { 1 };
locktree *lt = mgr.get_lt(dict_id, dbt_comparator, nullptr);
const DBT *one = get_dbt(1);
// a locks one
lock_request a;
a.create();
a.set(lt, 1, one, one, lock_request::type::WRITE, false);
r = a.start();
assert(r == 0);
// b tries to lock one, fails
lock_request b;
b.create();
b.set(lt, 2, one, one, lock_request::type::WRITE, false);
r = b.start();
assert(r == DB_LOCK_NOTGRANTED);
// a releases its lock
locktree_release_lock(lt, 1, one, one);
// b waits for one, gets locks immediately
t_wait = toku_current_time_microsec();
r = b.wait(my_lock_wait_time, my_killed_time, my_killed_callback);
assert(r == 0);
// b releases its lock so we can exit cleanly
locktree_release_lock(lt, 2, one, one);
a.destroy();
b.destroy();
mgr.release_lt(lt);
mgr.destroy();
}
} /* namespace toku */
int main(void) {
toku::test_start_release_wait();
return 0;
}
......@@ -37,6 +37,7 @@ Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
#include <iostream>
#include <thread>
#include "test.h"
#include "locktree.h"
#include "lock_request.h"
......@@ -47,15 +48,6 @@ Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
namespace toku {
struct locker_arg {
locktree *_lt;
TXNID _id;
const DBT *_key;
locker_arg(locktree *lt, TXNID id, const DBT *key) : _lt(lt), _id(id), _key(key) {
}
};
static void locker_callback(void) {
usleep(10000);
}
......@@ -97,20 +89,13 @@ static void run_locker(locktree *lt, TXNID txnid, const DBT *key) {
toku_pthread_yield();
if ((i % 10) == 0)
std::cout << toku_pthread_self() << " " << i << std::endl;
std::cout << std::this_thread::get_id() << " " << i << std::endl;
}
}
static void *locker(void *v_arg) {
locker_arg *arg = static_cast<locker_arg *>(v_arg);
run_locker(arg->_lt, arg->_id, arg->_key);
return arg;
}
} /* namespace toku */
int main(void) {
int r;
toku::locktree lt;
DICTIONARY_ID dict_id = { 1 };
......@@ -119,18 +104,12 @@ int main(void) {
const DBT *one = toku::get_dbt(1);
const int n_workers = 2;
toku_pthread_t ids[n_workers];
std::thread worker[n_workers];
for (int i = 0; i < n_workers; i++) {
toku::locker_arg *arg = new toku::locker_arg(&lt, i, one);
r = toku_pthread_create(&ids[i], nullptr, toku::locker, arg);
assert_zero(r);
worker[i] = std::thread(toku::run_locker, &lt, i, one);
}
for (int i = 0; i < n_workers; i++) {
void *ret;
r = toku_pthread_join(ids[i], &ret);
assert_zero(r);
toku::locker_arg *arg = static_cast<toku::locker_arg *>(ret);
delete arg;
worker[i].join();
}
lt.release_reference();
......
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id$"
/*======
This file is part of PerconaFT.
Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
PerconaFT is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License, version 2,
as published by the Free Software Foundation.
PerconaFT is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
----------------------------------------
PerconaFT is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License, version 3,
as published by the Free Software Foundation.
PerconaFT is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
======= */
#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
#include <iostream>
#include <thread>
#include <pthread.h>
#include "test.h"
#include "locktree.h"
#include "lock_request.h"
// Suppose that 3 threads are running a lock acquire, release, retry sequence. There is
// a race in the retry algorithm with 2 threads running lock retry simultaneously. The
// first thread to run retry sets a flag that will cause the second thread to skip the
// lock retries. If the first thread progressed past the contended lock, then the second
// threa will HANG until its lock timer pops, even when the contended lock is no longer held.
// This test exposes this problem as a test hang. The group retry algorithm fixes the race
// in the lock request retry algorihm and this test should no longer hang.
namespace toku {
// use 1000 when after_retry_all is implemented, otherwise use 100000
static const int n_tests = 1000; // 100000;
static void after_retry_all(void) {
usleep(10000);
}
static void run_locker(locktree *lt, TXNID txnid, const DBT *key, pthread_barrier_t *b) {
for (int i = 0; i < n_tests; i++) {
int r;
r = pthread_barrier_wait(b); assert(r == 0 || r == PTHREAD_BARRIER_SERIAL_THREAD);
lock_request request;
request.create();
request.set(lt, txnid, key, key, lock_request::type::WRITE, false);
// try to acquire the lock
r = request.start();
if (r == DB_LOCK_NOTGRANTED) {
// wait for the lock to be granted
r = request.wait(1000 * 1000);
}
if (r == 0) {
// release the lock
range_buffer buffer;
buffer.create();
buffer.append(key, key);
lt->release_locks(txnid, &buffer);
buffer.destroy();
// retry pending lock requests
lock_request::retry_all_lock_requests(lt, nullptr, after_retry_all);
}
request.destroy();
memset(&request, 0xab, sizeof request);
toku_pthread_yield();
if ((i % 10) == 0)
std::cout << std::this_thread::get_id() << " " << i << std::endl;
}
}
} /* namespace toku */
int main(void) {
toku::locktree lt;
DICTIONARY_ID dict_id = { 1 };
lt.create(nullptr, dict_id, toku::dbt_comparator);
const DBT *one = toku::get_dbt(1);
const int n_workers = 3;
std::thread worker[n_workers];
pthread_barrier_t b;
int r = pthread_barrier_init(&b, nullptr, n_workers); assert(r == 0);
for (int i = 0; i < n_workers; i++) {
worker[i] = std::thread(toku::run_locker, &lt, i, one, &b);
}
for (int i = 0; i < n_workers; i++) {
worker[i].join();
}
r = pthread_barrier_destroy(&b); assert(r == 0);
lt.release_reference();
lt.destroy();
return 0;
}
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id$"
/*======
This file is part of PerconaFT.
Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
PerconaFT is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License, version 2,
as published by the Free Software Foundation.
PerconaFT is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
----------------------------------------
PerconaFT is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License, version 3,
as published by the Free Software Foundation.
PerconaFT is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
======= */
#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
#include <iostream>
#include <thread>
#include <pthread.h>
#include "test.h"
#include "locktree.h"
#include "lock_request.h"
// Suppose that 2 threads are running a lock acquire, release, retry sequence. There is a
// race between the acquire and the release with 2 threads. If thread 1 acquires a lock,
// and thread 2 tries to acquire the same lock and fails, thread 1 may release its lock and retry
// pending lock requests BEFORE thread 2 adds itself to the pending lock requests. If this
// happens, then thread 2 will HANG until its lock timer expires even when the lock it is
// waiting for is FREE.
// This test exposes this problem as a test hang. If the race is fixed, then the test runs to
// completion.
namespace toku {
static void start_before_pending(void) {
usleep(10000);
}
static void run_locker(locktree *lt, TXNID txnid, const DBT *key, pthread_barrier_t *b) {
for (int i = 0; i < 100000; i++) {
int r;
r = pthread_barrier_wait(b); assert(r == 0 || r == PTHREAD_BARRIER_SERIAL_THREAD);
lock_request request;
request.create();
request.set(lt, txnid, key, key, lock_request::type::WRITE, false);
// if the callback is included, then the race is easy to reproduce. Otherwise, several
// test runs may be required before the race happens.
if (1) request.set_start_before_pending_test_callback(start_before_pending);
// try to acquire the lock
r = request.start();
if (r == DB_LOCK_NOTGRANTED) {
// wait for the lock to be granted
r = request.wait(1000 * 1000);
}
if (r == 0) {
// release the lock
range_buffer buffer;
buffer.create();
buffer.append(key, key);
lt->release_locks(txnid, &buffer);
buffer.destroy();
// retry pending lock requests
lock_request::retry_all_lock_requests(lt);
}
request.destroy();
memset(&request, 0xab, sizeof request);
toku_pthread_yield();
if ((i % 10) == 0)
std::cout << std::this_thread::get_id() << " " << i << std::endl;
}
}
} /* namespace toku */
int main(void) {
toku::locktree lt;
DICTIONARY_ID dict_id = { 1 };
lt.create(nullptr, dict_id, toku::dbt_comparator);
const DBT *one = toku::get_dbt(1);
const int n_workers = 2;
std::thread worker[n_workers];
pthread_barrier_t b;
int r = pthread_barrier_init(&b, nullptr, n_workers); assert(r == 0);
for (int i = 0; i < n_workers; i++) {
worker[i] = std::thread(toku::run_locker, &lt, i, one, &b);
}
for (int i = 0; i < n_workers; i++) {
worker[i].join();
}
r = pthread_barrier_destroy(&b); assert(r == 0);
lt.release_reference();
lt.destroy();
return 0;
}
......@@ -55,7 +55,8 @@ static int iterate_callback(DB_TXN *txn,
iterate_row_locks_callback iterate_locks,
void *locks_extra, void *extra) {
uint64_t txnid = txn->id64(txn);
uint64_t client_id = txn->get_client_id(txn);
uint64_t client_id; void *client_extra;
txn->get_client_id(txn, &client_id, &client_extra);
iterate_extra *info = reinterpret_cast<iterate_extra *>(extra);
DB *db;
DBT left_key, right_key;
......@@ -93,13 +94,13 @@ int test_main(int UU(argc), char *const UU(argv[])) {
r = env->open(env, TOKU_TEST_FILENAME, env_flags, 0755); CKERR(r);
r = env->txn_begin(env, NULL, &txn1, 0); CKERR(r);
txn1->set_client_id(txn1, 0);
txn1->set_client_id(txn1, 0, NULL);
txnid1 = txn1->id64(txn1);
r = env->txn_begin(env, NULL, &txn2, 0); CKERR(r);
txn2->set_client_id(txn2, 1);
txn2->set_client_id(txn2, 1, NULL);
txnid2 = txn2->id64(txn2);
r = env->txn_begin(env, NULL, &txn3, 0); CKERR(r);
txn3->set_client_id(txn3, 2);
txn3->set_client_id(txn3, 2, NULL);
txnid3 = txn3->id64(txn3);
{
......
......@@ -93,7 +93,8 @@ static int iterate_txns(DB_TXN *txn,
iterate_row_locks_callback iterate_locks,
void *locks_extra, void *extra) {
uint64_t txnid = txn->id64(txn);
uint64_t client_id = txn->get_client_id(txn);
uint64_t client_id; void *client_extra;
txn->get_client_id(txn, &client_id, &client_extra);
invariant_null(extra);
invariant(txnid > 0);
invariant(client_id == 0);
......
......@@ -105,6 +105,7 @@ struct __toku_db_env_internal {
TOKULOGGER logger;
toku::locktree_manager ltm;
lock_timeout_callback lock_wait_timeout_callback; // Called when a lock request times out waiting for a lock.
lock_wait_callback lock_wait_needed_callback; // Called when a lock request requires a wait.
DB *directory; // Maps dnames to inames
DB *persistent_environment; // Stores environment settings, can be used for upgrade
......
......@@ -1804,6 +1804,12 @@ env_set_lock_timeout_callback(DB_ENV *env, lock_timeout_callback callback) {
return 0;
}
static int
env_set_lock_wait_callback(DB_ENV *env, lock_wait_callback callback) {
env->i->lock_wait_needed_callback = callback;
return 0;
}
static void
format_time(const time_t *timer, char *buf) {
ctime_r(timer, buf);
......@@ -2620,6 +2626,10 @@ static void env_set_killed_callback(DB_ENV *env, uint64_t default_killed_time_ms
env->i->killed_callback = killed_callback;
}
static void env_kill_waiter(DB_ENV *env, void *extra) {
env->i->ltm.kill_waiter(extra);
}
static void env_do_backtrace(DB_ENV *env) {
if (env->i->errcall) {
db_env_do_backtrace_errfunc((toku_env_err_func) toku_env_err, (const void *) env);
......@@ -2700,6 +2710,7 @@ toku_env_create(DB_ENV ** envp, uint32_t flags) {
USENV(get_lock_timeout);
USENV(set_lock_timeout);
USENV(set_lock_timeout_callback);
USENV(set_lock_wait_callback);
USENV(set_redzone);
USENV(log_flush);
USENV(log_archive);
......@@ -2719,6 +2730,7 @@ toku_env_create(DB_ENV ** envp, uint32_t flags) {
USENV(set_dir_per_db);
USENV(get_dir_per_db);
USENV(get_data_dir);
USENV(kill_waiter);
#undef USENV
// unlocked methods
......
......@@ -193,7 +193,10 @@ int toku_db_start_range_lock(DB *db, DB_TXN *txn, const DBT *left_key, const DBT
toku::lock_request::type lock_type, toku::lock_request *request) {
DB_TXN *txn_anc = txn_oldest_ancester(txn);
TXNID txn_anc_id = txn_anc->id64(txn_anc);
request->set(db->i->lt, txn_anc_id, left_key, right_key, lock_type, toku_is_big_txn(txn_anc));
uint64_t client_id;
void *client_extra;
txn->get_client_id(txn, &client_id, &client_extra);
request->set(db->i->lt, txn_anc_id, left_key, right_key, lock_type, toku_is_big_txn(txn_anc), client_extra);
const int r = request->start();
if (r == 0) {
......@@ -221,7 +224,8 @@ int toku_db_wait_range_lock(DB *db, DB_TXN *txn, toku::lock_request *request) {
uint64_t killed_time_msec = env->i->default_killed_time_msec;
if (env->i->get_killed_time_callback)
killed_time_msec = env->i->get_killed_time_callback(killed_time_msec);
const int r = request->wait(wait_time_msec, killed_time_msec, env->i->killed_callback);
const int r = request->wait(wait_time_msec, killed_time_msec, env->i->killed_callback,
env->i->lock_wait_needed_callback);
if (r == 0) {
db_txn_note_row_lock(db, txn_anc, left_key, right_key);
} else if (r == DB_LOCK_NOTGRANTED) {
......@@ -248,7 +252,10 @@ void toku_db_grab_write_lock (DB *db, DBT *key, TOKUTXN tokutxn) {
// This lock request must succeed, so we do not want to wait
toku::lock_request request;
request.create();
request.set(db->i->lt, txn_anc_id, key, key, toku::lock_request::type::WRITE, toku_is_big_txn(txn_anc));
uint64_t client_id;
void *client_extra;
txn->get_client_id(txn, &client_id, &client_extra);
request.set(db->i->lt, txn_anc_id, key, key, toku::lock_request::type::WRITE, toku_is_big_txn(txn_anc), client_extra);
int r = request.start();
invariant_zero(r);
db_txn_note_row_lock(db, txn_anc, key, key);
......@@ -268,7 +275,7 @@ void toku_db_release_lt_key_ranges(DB_TXN *txn, txn_lt_key_ranges *ranges) {
// all of our locks have been released, so first try to wake up
// pending lock requests, then release our reference on the lt
toku::lock_request::retry_all_lock_requests(lt);
toku::lock_request::retry_all_lock_requests(lt, txn->mgrp->i->lock_wait_needed_callback);
// Release our reference on this locktree
toku::locktree_manager *ltm = &txn->mgrp->i->ltm;
......
......@@ -323,12 +323,12 @@ int locked_txn_abort(DB_TXN *txn) {
return r;
}
static void locked_txn_set_client_id(DB_TXN *txn, uint64_t client_id) {
toku_txn_set_client_id(db_txn_struct_i(txn)->tokutxn, client_id);
static void locked_txn_set_client_id(DB_TXN *txn, uint64_t client_id, void *client_extra) {
toku_txn_set_client_id(db_txn_struct_i(txn)->tokutxn, client_id, client_extra);
}
static uint64_t locked_txn_get_client_id(DB_TXN *txn) {
return toku_txn_get_client_id(db_txn_struct_i(txn)->tokutxn);
static void locked_txn_get_client_id(DB_TXN *txn, uint64_t *client_id, void **client_extra) {
toku_txn_get_client_id(db_txn_struct_i(txn)->tokutxn, client_id, client_extra);
}
static int toku_txn_discard(DB_TXN *txn, uint32_t flags) {
......
......@@ -533,7 +533,7 @@ typedef struct index_read_info {
static int ai_poll_fun(void *extra, float progress) {
LOADER_CONTEXT context = (LOADER_CONTEXT)extra;
if (thd_killed(context->thd)) {
if (thd_kill_level(context->thd)) {
sprintf(context->write_status_msg, "The process has been killed, aborting add index.");
return ER_ABORTING_CONNECTION;
}
......@@ -548,7 +548,7 @@ static int ai_poll_fun(void *extra, float progress) {
static int loader_poll_fun(void *extra, float progress) {
LOADER_CONTEXT context = (LOADER_CONTEXT)extra;
if (thd_killed(context->thd)) {
if (thd_kill_level(context->thd)) {
sprintf(context->write_status_msg, "The process has been killed, aborting bulk load.");
return ER_ABORTING_CONNECTION;
}
......@@ -3435,7 +3435,7 @@ int ha_tokudb::end_bulk_insert(bool abort) {
ai_metadata_update_required = false;
loader_error = 0;
if (loader) {
if (!abort_loader && !thd_killed(thd)) {
if (!abort_loader && !thd_kill_level(thd)) {
DBUG_EXECUTE_IF("tokudb_end_bulk_insert_sleep", {
const char *orig_proc_info = tokudb_thd_get_proc_info(thd);
thd_proc_info(thd, "DBUG sleep");
......@@ -3445,7 +3445,7 @@ int ha_tokudb::end_bulk_insert(bool abort) {
error = loader->close(loader);
loader = NULL;
if (error) {
if (thd_killed(thd)) {
if (thd_kill_level(thd)) {
my_error(ER_QUERY_INTERRUPTED, MYF(0));
}
goto cleanup;
......@@ -3580,7 +3580,7 @@ int ha_tokudb::is_index_unique(bool* is_unique, DB_TXN* txn, DB* db, KEY* key_in
share->row_count(),
key_info->name);
thd_proc_info(thd, status_msg);
if (thd_killed(thd)) {
if (thd_kill_level(thd)) {
my_error(ER_QUERY_INTERRUPTED, MYF(0));
error = ER_QUERY_INTERRUPTED;
goto cleanup;
......@@ -5245,7 +5245,7 @@ int ha_tokudb::fill_range_query_buf(
// otherwise, if we simply see that the current key is no match,
// we tell the cursor to continue and don't store
// the key locally
if (result == ICP_OUT_OF_RANGE || thd_killed(thd)) {
if (result == ICP_OUT_OF_RANGE || thd_kill_level(thd)) {
icp_went_out_of_range = true;
error = 0;
DEBUG_SYNC(ha_thd(), "tokudb_icp_asc_scan_out_of_range");
......@@ -5613,7 +5613,7 @@ int ha_tokudb::get_next(
static_cast<tokudb_trx_data*>(thd_get_ha_data(thd, tokudb_hton));
trx->stmt_progress.queried++;
track_progress(thd);
if (thd_killed(thd))
if (thd_kill_level(thd))
error = ER_ABORTING_CONNECTION;
}
cleanup:
......@@ -8351,7 +8351,7 @@ int ha_tokudb::tokudb_add_index(
(long long unsigned)share->row_count());
#endif
if (thd_killed(thd)) {
if (thd_kill_level(thd)) {
error = ER_ABORTING_CONNECTION;
goto cleanup;
}
......
......@@ -225,7 +225,7 @@ int recount_rows_t::analyze_recount_rows_progress(
_ticks = 0;
uint64_t now = tokudb::time::microsec();
_total_elapsed_time = now - _recount_start;
if ((_thd && thd_killed(_thd)) || cancelled()) {
if ((_thd && thd_kill_level(_thd)) || cancelled()) {
// client killed
return ER_ABORTING_CONNECTION;
}
......@@ -540,7 +540,7 @@ int standard_t::analyze_key_progress(void) {
uint64_t now = tokudb::time::microsec();
_total_elapsed_time = now - _analyze_start;
_key_elapsed_time = now - _analyze_key_start;
if ((_thd && thd_killed(_thd)) || cancelled()) {
if ((_thd && thd_kill_level(_thd)) || cancelled()) {
// client killed
return ER_ABORTING_CONNECTION;
} else if (_time_limit > 0 &&
......@@ -876,7 +876,7 @@ typedef struct hot_optimize_context {
static int hot_optimize_progress_fun(void *extra, float progress) {
HOT_OPTIMIZE_CONTEXT context = (HOT_OPTIMIZE_CONTEXT)extra;
if (thd_killed(context->thd)) {
if (thd_kill_level(context->thd)) {
sprintf(
context->write_status_msg,
"The process has been killed, aborting hot optimize.");
......@@ -1003,7 +1003,7 @@ struct check_context {
static int ha_tokudb_check_progress(void* extra, float progress) {
struct check_context* context = (struct check_context*)extra;
int result = 0;
if (thd_killed(context->thd))
if (thd_kill_level(context->thd))
result = ER_ABORTING_CONNECTION;
return result;
}
......
......@@ -55,6 +55,7 @@ static bool tokudb_show_status(
static void tokudb_handle_fatal_signal(handlerton* hton, THD* thd, int sig);
#endif
static int tokudb_close_connection(handlerton* hton, THD* thd);
static void tokudb_kill_query(handlerton *hton, THD *thd, enum thd_kill_levels level);
static int tokudb_commit(handlerton* hton, THD* thd, bool all);
static int tokudb_rollback(handlerton* hton, THD* thd, bool all);
#if TOKU_INCLUDE_XA
......@@ -147,6 +148,11 @@ static void tokudb_lock_timeout_callback(
const DBT* right_key,
uint64_t blocking_txnid);
static void tokudb_lock_wait_needed_callback(
void* arg,
uint64_t requesting_txnid,
uint64_t blocking_txnid);
#define ASSERT_MSGLEN 1024
void toku_hton_assert_fail(
......@@ -331,6 +337,7 @@ static int tokudb_init_func(void *p) {
tokudb_hton->create = tokudb_create_handler;
tokudb_hton->close_connection = tokudb_close_connection;
tokudb_hton->kill_query = tokudb_kill_query;
tokudb_hton->savepoint_offset = sizeof(SP_INFO_T);
tokudb_hton->savepoint_set = tokudb_savepoint;
......@@ -532,6 +539,7 @@ static int tokudb_init_func(void *p) {
db_env->set_lock_timeout_callback(db_env, tokudb_lock_timeout_callback);
db_env->set_dir_per_db(db_env, tokudb::sysvars::dir_per_db);
db_env->set_lock_wait_callback(db_env, tokudb_lock_wait_needed_callback);
db_env->set_loader_memory_size(
db_env,
......@@ -754,6 +762,12 @@ static int tokudb_close_connection(handlerton* hton, THD* thd) {
return error;
}
void tokudb_kill_query(handlerton *hton, THD *thd, enum thd_kill_levels level) {
TOKUDB_DBUG_ENTER("");
db_env->kill_waiter(db_env, thd);
DBUG_VOID_RETURN;
}
bool tokudb_flush_logs(handlerton * hton) {
TOKUDB_DBUG_ENTER("");
int error;
......@@ -873,9 +887,9 @@ static int tokudb_commit(handlerton * hton, THD * thd, bool all) {
tokudb_sync_on_commit(thd, trx, this_txn) ? 0 : DB_TXN_NOSYNC;
TOKUDB_TRACE_FOR_FLAGS(
TOKUDB_DEBUG_TXN,
"commit trx %u txn %p syncflag %u",
"commit trx %u txn %p %" PRIu64 " syncflag %u",
all,
this_txn,
this_txn, this_txn->id64(this_txn),
syncflag);
// test hook to induce a crash on a debug build
DBUG_EXECUTE_IF("tokudb_crash_commit_before", DBUG_SUICIDE(););
......@@ -904,9 +918,9 @@ static int tokudb_rollback(handlerton * hton, THD * thd, bool all) {
if (this_txn) {
TOKUDB_TRACE_FOR_FLAGS(
TOKUDB_DEBUG_TXN,
"rollback %u txn %p",
"rollback %u txn %p %" PRIu64,
all,
this_txn);
this_txn, this_txn->id64(this_txn));
tokudb_cleanup_handlers(trx, this_txn);
abort_txn_with_progress(this_txn, thd);
*txn = NULL;
......@@ -952,9 +966,9 @@ static int tokudb_xa_prepare(handlerton* hton, THD* thd, bool all) {
uint32_t syncflag = tokudb_sync_on_prepare() ? 0 : DB_TXN_NOSYNC;
TOKUDB_TRACE_FOR_FLAGS(
TOKUDB_DEBUG_XA,
"doing txn prepare:%d:%p",
"doing txn prepare:%d:%p %" PRIu64,
all,
txn);
txn, txn->id64(txn));
// a TOKU_XA_XID is identical to a MYSQL_XID
TOKU_XA_XID thd_xid;
thd_get_xid(thd, (MYSQL_XID*) &thd_xid);
......@@ -1570,7 +1584,9 @@ static int tokudb_search_txn_callback(
void* extra) {
uint64_t txn_id = txn->id64(txn);
uint64_t client_id = txn->get_client_id(txn);
uint64_t client_id;
void *client_extra;
txn->get_client_id(txn, &client_id, &client_extra);
struct tokudb_search_txn_extra* e =
reinterpret_cast<struct tokudb_search_txn_extra*>(extra);
if (e->match_txn_id == txn_id) {
......@@ -1748,6 +1764,63 @@ static void tokudb_lock_timeout_callback(
}
}
extern "C" int thd_rpl_deadlock_check(MYSQL_THD thd, MYSQL_THD other_thd);
struct tokudb_search_txn_thd {
bool match_found;
uint64_t match_txn_id;
THD *match_client_thd;
};
static int tokudb_search_txn_thd_callback(
DB_TXN* txn,
iterate_row_locks_callback iterate_locks,
void* locks_extra,
void* extra) {
uint64_t txn_id = txn->id64(txn);
uint64_t client_id;
void *client_extra;
txn->get_client_id(txn, &client_id, &client_extra);
struct tokudb_search_txn_thd* e =
reinterpret_cast<struct tokudb_search_txn_thd*>(extra);
if (e->match_txn_id == txn_id) {
e->match_found = true;
e->match_client_thd = reinterpret_cast<THD *>(client_extra);
return 1;
}
return 0;
}
static bool tokudb_txn_id_to_thd(
uint64_t txnid,
THD **out_thd) {
struct tokudb_search_txn_thd e = {
false,
txnid,
0
};
db_env->iterate_live_transactions(db_env, tokudb_search_txn_thd_callback, &e);
if (e.match_found) {
*out_thd = e.match_client_thd;
}
return e.match_found;
}
static void tokudb_lock_wait_needed_callback(
void *arg,
uint64_t requesting_txnid,
uint64_t blocking_txnid) {
THD *requesting_thd;
THD *blocking_thd;
if (tokudb_txn_id_to_thd(requesting_txnid, &requesting_thd) &&
tokudb_txn_id_to_thd(blocking_txnid, &blocking_thd)) {
thd_rpl_deadlock_check (requesting_thd, blocking_thd);
}
}
// Retrieves variables for information_schema.global_status.
// Names (columnname) are automatically converted to upper case,
// and prefixed with "TOKUDB_"
......
......@@ -172,12 +172,12 @@ inline uint64_t tokudb_get_killed_time_callback(uint64_t default_killed_time) {
inline int tokudb_killed_callback(void) {
THD *thd = current_thd;
return thd_killed(thd);
return thd_kill_level(thd);
}
inline bool tokudb_killed_thd_callback(void *extra, uint64_t deleted_rows) {
THD *thd = static_cast<THD *>(extra);
return thd_killed(thd) != 0;
return thd_kill_level(thd) != 0;
}
......
include/master-slave.inc
[connection master]
ALTER TABLE mysql.gtid_slave_pos ENGINE=TokuDB;
CREATE TABLE t1 (a int PRIMARY KEY, b INT, UNIQUE KEY (b)) ENGINE=TokuDB;
SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=10;
CHANGE MASTER TO master_use_gtid=slave_pos;
SET @old_parallel_mode=@@GLOBAL.slave_parallel_mode;
SET GLOBAL slave_parallel_mode='optimistic';
INSERT INTO t1 VALUES(1,1);
BEGIN;
INSERT INTO t1 VALUES(2,2);
INSERT INTO t1 VALUES(3,3);
COMMIT;
DELETE FROM t1 WHERE a=2;
INSERT INTO t1 VALUES (2,2);
DELETE FROM t1 WHERE a=2;
INSERT INTO t1 VALUES (2,6);
DELETE FROM t1 WHERE a=2;
INSERT INTO t1 VALUES (2,4);
DELETE FROM t1 WHERE a=2;
INSERT INTO t1 VALUES (2,5);
DELETE FROM t1 WHERE a=3;
INSERT INTO t1 VALUES(3,3);
DELETE FROM t1 WHERE a=1;
INSERT INTO t1 VALUES(1,4);
DELETE FROM t1 WHERE a=3;
INSERT INTO t1 VALUES(3,3);
DELETE FROM t1 WHERE a=2;
INSERT INTO t1 VALUES (2,6);
include/save_master_gtid.inc
SELECT * FROM t1 ORDER BY a;
a b
1 4
2 6
3 3
include/start_slave.inc
include/sync_with_master_gtid.inc
SELECT * FROM t1 ORDER BY a;
a b
1 4
2 6
3 3
*** Test a bunch of non-transactional/DDL event groups. ***
include/stop_slave.inc
INSERT INTO t1 VALUES (4,8);
INSERT INTO t1 VALUES (5,9);
CREATE TABLE t2 (a INT PRIMARY KEY) ENGINE=TokuDB;
INSERT INTO t2 VALUES (1);
CREATE TABLE t3 (a INT PRIMARY KEY) ENGINE=MyISAM;
ALTER TABLE t2 ADD b INT;
INSERT INTO t2 VALUES (2,2);
ALTER TABLE t2 DROP b;
INSERT INTO t2 VALUES (3);
ALTER TABLE t2 ADD c INT;
INSERT INTO t2 VALUES (4,5);
INSERT INTO t2 VALUES (5,5);
INSERT INTO t3 VALUES (1);
UPDATE t2 SET c=NULL WHERE a=4;
ALTER TABLE t2 ADD UNIQUE (c);
INSERT INTO t2 VALUES (6,6);
UPDATE t2 SET c=c+100 WHERE a=2;
INSERT INTO t3(a) VALUES (2);
DELETE FROM t3 WHERE a=2;
INSERT INTO t3(a) VALUES (2);
DELETE FROM t3 WHERE a=2;
ALTER TABLE t3 CHANGE a c INT NOT NULL;
INSERT INTO t3(c) VALUES (2);
DELETE FROM t3 WHERE c=2;
INSERT INTO t3 SELECT a+200 FROM t2;
DELETE FROM t3 WHERE c >= 200;
INSERT INTO t3 SELECT a+200 FROM t2;
include/save_master_gtid.inc
SELECT * FROM t1 ORDER BY a;
a b
1 4
2 6
3 3
4 8
5 9
SELECT * FROM t2 ORDER BY a;
a c
1 NULL
2 NULL
3 NULL
4 NULL
5 5
6 6
SELECT * FROM t3 ORDER BY c;
c
1
201
202
203
204
205
206
include/start_slave.inc
include/sync_with_master_gtid.inc
SELECT * FROM t1 ORDER BY a;
a b
1 4
2 6
3 3
4 8
5 9
SELECT * FROM t2 ORDER BY a;
a c
1 NULL
2 NULL
3 NULL
4 NULL
5 5
6 6
SELECT * FROM t3 ORDER BY c;
c
1
201
202
203
204
205
206
*** Test @@skip_parallel_replication. ***
include/stop_slave.inc
UPDATE t1 SET b=10 WHERE a=3;
SET SESSION skip_parallel_replication=1;
UPDATE t1 SET b=20 WHERE a=3;
UPDATE t1 SET b=30 WHERE a=3;
UPDATE t1 SET b=50 WHERE a=3;
UPDATE t1 SET b=80 WHERE a=3;
UPDATE t1 SET b=130 WHERE a=3;
UPDATE t1 SET b=210 WHERE a=3;
UPDATE t1 SET b=340 WHERE a=3;
UPDATE t1 SET b=550 WHERE a=3;
UPDATE t1 SET b=890 WHERE a=3;
SET SESSION skip_parallel_replication=0;
SELECT * FROM t1 ORDER BY a;
a b
1 4
2 6
3 890
4 8
5 9
include/save_master_gtid.inc
include/start_slave.inc
include/sync_with_master_gtid.inc
SELECT * FROM t1 ORDER BY a;
a b
1 4
2 6
3 890
4 8
5 9
status
Ok, no retry
*** Test that we do not replicate in parallel transactions that had row lock waits on the master ***
include/stop_slave.inc
BEGIN;
UPDATE t1 SET b=b+1 WHERE a=3;
SET debug_sync='thd_report_wait_for SIGNAL waiting1';
UPDATE t1 SET b=1001 WHERE a=3;
SET debug_sync='now WAIT_FOR waiting1';
BEGIN;
UPDATE t1 SET b=1002 WHERE a=5;
SET debug_sync='thd_report_wait_for SIGNAL waiting2';
UPDATE t1 SET b=102 WHERE a=3;
SET debug_sync='now WAIT_FOR waiting2';
UPDATE t1 SET b=1000 WHERE a=1;
SET debug_sync='thd_report_wait_for SIGNAL waiting3';
UPDATE t1 SET b=1003 WHERE a=5;
SET debug_sync='now WAIT_FOR waiting3';
SET debug_sync='thd_report_wait_for SIGNAL waiting4';
UPDATE t1 SET b=1004 WHERE a=3;
SET debug_sync='now WAIT_FOR waiting4';
SET debug_sync='thd_report_wait_for SIGNAL waiting5';
UPDATE t1 SET b=1005 WHERE a=5;
SET debug_sync='now WAIT_FOR waiting5';
SET debug_sync='thd_report_wait_for SIGNAL waiting6';
UPDATE t1 SET b=1006 WHERE a=1;
SET debug_sync='now WAIT_FOR waiting6';
SET debug_sync='thd_report_wait_for SIGNAL waiting7';
UPDATE t1 SET b=1007 WHERE a=5;
SET debug_sync='now WAIT_FOR waiting7';
SET debug_sync='thd_report_wait_for SIGNAL waiting8';
UPDATE t1 SET b=1008 WHERE a=3;
SET debug_sync='now WAIT_FOR waiting8';
COMMIT;
COMMIT;
SET debug_sync='RESET';
include/save_master_gtid.inc
include/start_slave.inc
include/sync_with_master_gtid.inc
SELECT IF(@master_value=@slave_value, "Slave data matches master", CONCAT("ERROR: Slave had different data '", @slave_value, "' than master's '", @master_value, "'!")) as check_result;
check_result
Slave data matches master
status
Ok, no retry
*** Test that we replicate correctly when using READ COMMITTED and binlog_format=MIXED on the slave ***
include/stop_slave.inc
SET @old_format= @@GLOBAL.binlog_format;
SET GLOBAL binlog_format= MIXED;
SET @old_isolation= @@GLOBAL.tx_isolation;
SET GLOBAL TRANSACTION ISOLATION LEVEL READ COMMITTED;
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
DROP TABLE t1, t2;
CREATE TABLE t1 (a int PRIMARY KEY, b INT) ENGINE=TokuDB;
CREATE TABLE t2 (a int PRIMARY KEY, b INT) ENGINE=TokuDB;
INSERT INTO t1 VALUES (1,0), (2,0), (3,0);
INSERT INTO t2 VALUES (1,0), (2,0);
INSERT INTO t1 SELECT 4, COUNT(*) FROM t2;
INSERT INTO t2 SELECT 4, COUNT(*) FROM t1;
INSERT INTO t1 SELECT 5, COUNT(*) FROM t2;
INSERT INTO t2 SELECT 5, COUNT(*) FROM t1;
INSERT INTO t2 SELECT 6, COUNT(*) FROM t1;
INSERT INTO t1 SELECT 6, COUNT(*) FROM t2;
INSERT INTO t1 SELECT 7, COUNT(*) FROM t2;
INSERT INTO t2 SELECT 7, COUNT(*) FROM t1;
INSERT INTO t2 SELECT 8, COUNT(*) FROM t1;
INSERT INTO t1 SELECT 8, COUNT(*) FROM t2;
INSERT INTO t2 SELECT 9, COUNT(*) FROM t1;
INSERT INTO t1 SELECT 9, COUNT(*) FROM t2;
INSERT INTO t1 SELECT 10, COUNT(*) FROM t2;
INSERT INTO t2 SELECT 10, COUNT(*) FROM t1;
SELECT * FROM t1 ORDER BY a;
a b
1 0
2 0
3 0
4 2
5 3
6 5
7 5
8 7
9 8
10 8
SELECT * FROM t2 ORDER BY a;
a b
1 0
2 0
4 4
5 5
6 5
7 7
8 7
9 8
10 10
include/save_master_gtid.inc
include/start_slave.inc
include/sync_with_master_gtid.inc
SELECT * FROM t1 ORDER BY a;
a b
1 0
2 0
3 0
4 2
5 3
6 5
7 5
8 7
9 8
10 8
SELECT * FROM t2 ORDER BY a;
a b
1 0
2 0
4 4
5 5
6 5
7 7
8 7
9 8
10 10
include/stop_slave.inc
SET GLOBAL binlog_format= @old_format;
SET GLOBAL tx_isolation= @old_isolation;
include/start_slave.inc
*** MDEV-7888: ANALYZE TABLE does wakeup_subsequent_commits(), causing wrong binlog order and parallel replication hang ***
DROP TABLE t1, t2, t3;
CREATE TABLE t1 (a INT PRIMARY KEY, b INT) ENGINE=TokuDB;
CREATE TABLE t2 (a INT PRIMARY KEY, b INT) ENGINE=TokuDB;
CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=MyISAM;
INSERT INTO t2 VALUES (1,1), (2,1), (3,1), (4,1), (5,1);
include/save_master_gtid.inc
include/sync_with_master_gtid.inc
include/stop_slave.inc
SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL debug_dbug= '+d,inject_analyze_table_sleep';
ALTER TABLE t2 COMMENT "123abc";
ANALYZE TABLE t2;
Table Op Msg_type Msg_text
test.t2 analyze status OK
INSERT INTO t1 VALUES (1,2);
INSERT INTO t1 VALUES (2,2);
INSERT INTO t1 VALUES (3,2);
INSERT INTO t1 VALUES (4,2);
INSERT INTO t3 VALUES (1,3);
ALTER TABLE t2 COMMENT "hello, world";
BEGIN;
INSERT INTO t1 VALUES (5,4);
INSERT INTO t1 VALUES (6,4);
INSERT INTO t1 VALUES (7,4);
INSERT INTO t1 VALUES (8,4);
INSERT INTO t1 VALUES (9,4);
INSERT INTO t1 VALUES (10,4);
INSERT INTO t1 VALUES (11,4);
INSERT INTO t1 VALUES (12,4);
INSERT INTO t1 VALUES (13,4);
INSERT INTO t1 VALUES (14,4);
INSERT INTO t1 VALUES (15,4);
INSERT INTO t1 VALUES (16,4);
INSERT INTO t1 VALUES (17,4);
INSERT INTO t1 VALUES (18,4);
INSERT INTO t1 VALUES (19,4);
INSERT INTO t1 VALUES (20,4);
COMMIT;
INSERT INTO t1 VALUES (21,5);
INSERT INTO t1 VALUES (22,5);
SELECT * FROM t1 ORDER BY a;
a b
1 2
2 2
3 2
4 2
5 4
6 4
7 4
8 4
9 4
10 4
11 4
12 4
13 4
14 4
15 4
16 4
17 4
18 4
19 4
20 4
21 5
22 5
SELECT * FROM t2 ORDER BY a;
a b
1 1
2 1
3 1
4 1
5 1
SELECT * FROM t3 ORDER BY a;
a b
1 3
include/save_master_gtid.inc
include/start_slave.inc
include/sync_with_master_gtid.inc
SELECT * FROM t1 ORDER BY a;
a b
1 2
2 2
3 2
4 2
5 4
6 4
7 4
8 4
9 4
10 4
11 4
12 4
13 4
14 4
15 4
16 4
17 4
18 4
19 4
20 4
21 5
22 5
SELECT * FROM t2 ORDER BY a;
a b
1 1
2 1
3 1
4 1
5 1
SELECT * FROM t3 ORDER BY a;
a b
1 3
include/stop_slave.inc
SET GLOBAL debug_dbug= @old_debug;
include/start_slave.inc
*** MDEV-7929: record_gtid() for non-transactional event group calls wakeup_subsequent_commits() too early, causing slave hang. ***
include/stop_slave.inc
SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL debug_dbug= '+d,inject_record_gtid_serverid_100_sleep';
ALTER TABLE t3 COMMENT "DDL statement 1";
INSERT INTO t1 VALUES (30,0);
INSERT INTO t1 VALUES (31,0);
INSERT INTO t1 VALUES (32,0);
INSERT INTO t1 VALUES (33,0);
INSERT INTO t1 VALUES (34,0);
INSERT INTO t1 VALUES (35,0);
INSERT INTO t1 VALUES (36,0);
SET @old_server_id= @@SESSION.server_id;
SET SESSION server_id= 100;
ANALYZE TABLE t2;
Table Op Msg_type Msg_text
test.t2 analyze status OK
SET SESSION server_id= @old_server_id;
INSERT INTO t1 VALUES (37,0);
ALTER TABLE t3 COMMENT "DDL statement 2";
INSERT INTO t1 VALUES (38,0);
INSERT INTO t1 VALUES (39,0);
ALTER TABLE t3 COMMENT "DDL statement 3";
SELECT * FROM t1 WHERE a >= 30 ORDER BY a;
a b
30 0
31 0
32 0
33 0
34 0
35 0
36 0
37 0
38 0
39 0
include/save_master_gtid.inc
include/start_slave.inc
include/sync_with_master_gtid.inc
SELECT * FROM t1 WHERE a >= 30 ORDER BY a;
a b
30 0
31 0
32 0
33 0
34 0
35 0
36 0
37 0
38 0
39 0
include/stop_slave.inc
SET GLOBAL debug_dbug= @old_debug;
include/start_slave.inc
*** MDEV-8113: ALTER TABLE causes slave hang in optimistic parallel replication ***
include/stop_slave.inc
ALTER TABLE t2 ADD c INT;
INSERT INTO t2 (a,b) VALUES (50, 0);
INSERT INTO t2 (a,b) VALUES (51, 1);
INSERT INTO t2 (a,b) VALUES (52, 2);
INSERT INTO t2 (a,b) VALUES (53, 3);
INSERT INTO t2 (a,b) VALUES (54, 4);
INSERT INTO t2 (a,b) VALUES (55, 5);
INSERT INTO t2 (a,b) VALUES (56, 6);
INSERT INTO t2 (a,b) VALUES (57, 7);
INSERT INTO t2 (a,b) VALUES (58, 8);
INSERT INTO t2 (a,b) VALUES (59, 9);
ALTER TABLE t2 DROP COLUMN c;
SELECT * FROM t2 WHERE a >= 50 ORDER BY a;
a b
50 0
51 1
52 2
53 3
54 4
55 5
56 6
57 7
58 8
59 9
include/save_master_gtid.inc
include/start_slave.inc
include/sync_with_master_gtid.inc
SELECT * FROM t2 WHERE a >= 50 ORDER BY a;
a b
50 0
51 1
52 2
53 3
54 4
55 5
56 6
57 7
58 8
59 9
include/stop_slave.inc
SET GLOBAL slave_parallel_mode=@old_parallel_mode;
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
include/start_slave.inc
DROP TABLE t1, t2, t3;
include/rpl_end.inc
--source include/have_tokudb.inc
--source include/have_debug.inc
--source include/have_debug_sync.inc
--source include/master-slave.inc
--connection master
ALTER TABLE mysql.gtid_slave_pos ENGINE=TokuDB;
CREATE TABLE t1 (a int PRIMARY KEY, b INT, UNIQUE KEY (b)) ENGINE=TokuDB;
--save_master_pos
--connection slave
--sync_with_master
SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
--source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=10;
CHANGE MASTER TO master_use_gtid=slave_pos;
SET @old_parallel_mode=@@GLOBAL.slave_parallel_mode;
SET GLOBAL slave_parallel_mode='optimistic';
--connection master
INSERT INTO t1 VALUES(1,1);
BEGIN;
INSERT INTO t1 VALUES(2,2);
INSERT INTO t1 VALUES(3,3);
COMMIT;
# Do a bunch of INSERT/DELETE on the same rows, bound to conflict.
# We will get a lot of rollbacks, probably, but they should be handled without
# any visible errors.
DELETE FROM t1 WHERE a=2;
INSERT INTO t1 VALUES (2,2);
DELETE FROM t1 WHERE a=2;
INSERT INTO t1 VALUES (2,6);
DELETE FROM t1 WHERE a=2;
INSERT INTO t1 VALUES (2,4);
DELETE FROM t1 WHERE a=2;
INSERT INTO t1 VALUES (2,5);
DELETE FROM t1 WHERE a=3;
INSERT INTO t1 VALUES(3,3);
DELETE FROM t1 WHERE a=1;
INSERT INTO t1 VALUES(1,4);
DELETE FROM t1 WHERE a=3;
INSERT INTO t1 VALUES(3,3);
DELETE FROM t1 WHERE a=2;
INSERT INTO t1 VALUES (2,6);
--source include/save_master_gtid.inc
SELECT * FROM t1 ORDER BY a;
--connection slave
--source include/start_slave.inc
--source include/sync_with_master_gtid.inc
SELECT * FROM t1 ORDER BY a;
#SHOW STATUS LIKE 'Slave_retried_transactions';
--echo *** Test a bunch of non-transactional/DDL event groups. ***
--connection slave
--source include/stop_slave.inc
--connection master
INSERT INTO t1 VALUES (4,8);
INSERT INTO t1 VALUES (5,9);
CREATE TABLE t2 (a INT PRIMARY KEY) ENGINE=TokuDB;
INSERT INTO t2 VALUES (1);
CREATE TABLE t3 (a INT PRIMARY KEY) ENGINE=MyISAM;
ALTER TABLE t2 ADD b INT;
INSERT INTO t2 VALUES (2,2);
ALTER TABLE t2 DROP b;
INSERT INTO t2 VALUES (3);
ALTER TABLE t2 ADD c INT;
INSERT INTO t2 VALUES (4,5);
INSERT INTO t2 VALUES (5,5);
INSERT INTO t3 VALUES (1);
UPDATE t2 SET c=NULL WHERE a=4;
ALTER TABLE t2 ADD UNIQUE (c);
INSERT INTO t2 VALUES (6,6);
UPDATE t2 SET c=c+100 WHERE a=2;
INSERT INTO t3(a) VALUES (2);
DELETE FROM t3 WHERE a=2;
INSERT INTO t3(a) VALUES (2);
DELETE FROM t3 WHERE a=2;
ALTER TABLE t3 CHANGE a c INT NOT NULL;
INSERT INTO t3(c) VALUES (2);
DELETE FROM t3 WHERE c=2;
INSERT INTO t3 SELECT a+200 FROM t2;
DELETE FROM t3 WHERE c >= 200;
INSERT INTO t3 SELECT a+200 FROM t2;
--source include/save_master_gtid.inc
SELECT * FROM t1 ORDER BY a;
SELECT * FROM t2 ORDER BY a;
SELECT * FROM t3 ORDER BY c;
--connection slave
--source include/start_slave.inc
--source include/sync_with_master_gtid.inc
SELECT * FROM t1 ORDER BY a;
SELECT * FROM t2 ORDER BY a;
SELECT * FROM t3 ORDER BY c;
#SHOW STATUS LIKE 'Slave_retried_transactions';
--echo *** Test @@skip_parallel_replication. ***
--connection slave
--source include/stop_slave.inc
--let $retry1= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1)
--connection master
# We do a bunch of conflicting transactions on the master with
# skip_parallel_replication set to true, and check that we do not
# get any retries on the slave.
UPDATE t1 SET b=10 WHERE a=3;
SET SESSION skip_parallel_replication=1;
UPDATE t1 SET b=20 WHERE a=3;
UPDATE t1 SET b=30 WHERE a=3;
UPDATE t1 SET b=50 WHERE a=3;
UPDATE t1 SET b=80 WHERE a=3;
UPDATE t1 SET b=130 WHERE a=3;
UPDATE t1 SET b=210 WHERE a=3;
UPDATE t1 SET b=340 WHERE a=3;
UPDATE t1 SET b=550 WHERE a=3;
UPDATE t1 SET b=890 WHERE a=3;
SET SESSION skip_parallel_replication=0;
SELECT * FROM t1 ORDER BY a;
--source include/save_master_gtid.inc
--connection slave
--source include/start_slave.inc
--source include/sync_with_master_gtid.inc
SELECT * FROM t1 ORDER BY a;
--let $retry2= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1)
--disable_query_log
eval SELECT IF($retry1=$retry2, "Ok, no retry",
CONCAT("ERROR: ", $retry2-$retry1, " retries during replication (was ",
$retry1, " now ", $retry2, ")")) AS status;
--enable_query_log
--echo *** Test that we do not replicate in parallel transactions that had row lock waits on the master ***
--connection slave
--source include/stop_slave.inc
--let $retry1= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1)
--connection master
# Setup a bunch of transactions that all needed to wait.
--connect (m1,127.0.0.1,root,,test,$SERVER_MYPORT_1,)
--connect (m2,127.0.0.1,root,,test,$SERVER_MYPORT_1,)
--connect (m3,127.0.0.1,root,,test,$SERVER_MYPORT_1,)
--connect (m4,127.0.0.1,root,,test,$SERVER_MYPORT_1,)
--connect (m5,127.0.0.1,root,,test,$SERVER_MYPORT_1,)
--connect (m6,127.0.0.1,root,,test,$SERVER_MYPORT_1,)
--connect (m7,127.0.0.1,root,,test,$SERVER_MYPORT_1,)
--connect (m8,127.0.0.1,root,,test,$SERVER_MYPORT_1,)
--connection default
BEGIN; UPDATE t1 SET b=b+1 WHERE a=3;
--connection m1
SET debug_sync='thd_report_wait_for SIGNAL waiting1';
send UPDATE t1 SET b=1001 WHERE a=3;
--connection default
SET debug_sync='now WAIT_FOR waiting1';
--connection m2
BEGIN;
UPDATE t1 SET b=1002 WHERE a=5;
SET debug_sync='thd_report_wait_for SIGNAL waiting2';
send UPDATE t1 SET b=102 WHERE a=3;
--connection default
SET debug_sync='now WAIT_FOR waiting2';
UPDATE t1 SET b=1000 WHERE a=1;
--connection m3
SET debug_sync='thd_report_wait_for SIGNAL waiting3';
send UPDATE t1 SET b=1003 WHERE a=5;
--connection default
SET debug_sync='now WAIT_FOR waiting3';
--connection m4
SET debug_sync='thd_report_wait_for SIGNAL waiting4';
send UPDATE t1 SET b=1004 WHERE a=3;
--connection default
SET debug_sync='now WAIT_FOR waiting4';
--connection m5
SET debug_sync='thd_report_wait_for SIGNAL waiting5';
send UPDATE t1 SET b=1005 WHERE a=5;
--connection default
SET debug_sync='now WAIT_FOR waiting5';
--connection m6
SET debug_sync='thd_report_wait_for SIGNAL waiting6';
send UPDATE t1 SET b=1006 WHERE a=1;
--connection default
SET debug_sync='now WAIT_FOR waiting6';
--connection m7
SET debug_sync='thd_report_wait_for SIGNAL waiting7';
send UPDATE t1 SET b=1007 WHERE a=5;
--connection default
SET debug_sync='now WAIT_FOR waiting7';
--connection m8
SET debug_sync='thd_report_wait_for SIGNAL waiting8';
send UPDATE t1 SET b=1008 WHERE a=3;
--connection default
SET debug_sync='now WAIT_FOR waiting8';
--connection default
COMMIT;
--connection m1
REAP;
--connection m2
REAP;
COMMIT;
--connection m3
REAP;
--connection m4
REAP;
--connection m5
REAP;
--connection m6
REAP;
--connection m7
REAP;
--connection m8
REAP;
--connection default
SET debug_sync='RESET';
--source include/save_master_gtid.inc
# It is not deterministic in which order the parallel conflicting
# updates will run. Eg. for key a=5, we could get 1003, 1005, or
# 1007. As long as we get the same on the slave, it is ok.
--let $master_value= `SELECT GROUP_CONCAT(CONCAT("(", a, ",", b, ")") ORDER BY a, b SEPARATOR "/") FROM t1`
--connection slave
--source include/start_slave.inc
--source include/sync_with_master_gtid.inc
--let $slave_value= `SELECT GROUP_CONCAT(CONCAT("(", a, ",", b, ")") ORDER BY a, b SEPARATOR "/") FROM t1`
--disable_query_log
eval SET @master_value="$master_value";
eval SET @slave_value="$slave_value";
--enable_query_log
SELECT IF(@master_value=@slave_value, "Slave data matches master", CONCAT("ERROR: Slave had different data '", @slave_value, "' than master's '", @master_value, "'!")) as check_result;
--let $retry2= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1)
--disable_query_log
eval SELECT IF($retry1=$retry2, "Ok, no retry",
CONCAT("ERROR: ", $retry2-$retry1, " retries during replication (was ",
$retry1, " now ", $retry2, ")")) AS status;
--enable_query_log
--echo *** Test that we replicate correctly when using READ COMMITTED and binlog_format=MIXED on the slave ***
--connection slave
--source include/stop_slave.inc
SET @old_format= @@GLOBAL.binlog_format;
# Use MIXED format; we cannot binlog ROW events on slave in STATEMENT format.
SET GLOBAL binlog_format= MIXED;
SET @old_isolation= @@GLOBAL.tx_isolation;
SET GLOBAL TRANSACTION ISOLATION LEVEL READ COMMITTED;
# Reset the worker threads to make the new settings take effect.
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
--connection master
DROP TABLE t1, t2;
CREATE TABLE t1 (a int PRIMARY KEY, b INT) ENGINE=TokuDB;
CREATE TABLE t2 (a int PRIMARY KEY, b INT) ENGINE=TokuDB;
INSERT INTO t1 VALUES (1,0), (2,0), (3,0);
INSERT INTO t2 VALUES (1,0), (2,0);
INSERT INTO t1 SELECT 4, COUNT(*) FROM t2;
INSERT INTO t2 SELECT 4, COUNT(*) FROM t1;
INSERT INTO t1 SELECT 5, COUNT(*) FROM t2;
INSERT INTO t2 SELECT 5, COUNT(*) FROM t1;
INSERT INTO t2 SELECT 6, COUNT(*) FROM t1;
INSERT INTO t1 SELECT 6, COUNT(*) FROM t2;
INSERT INTO t1 SELECT 7, COUNT(*) FROM t2;
INSERT INTO t2 SELECT 7, COUNT(*) FROM t1;
INSERT INTO t2 SELECT 8, COUNT(*) FROM t1;
INSERT INTO t1 SELECT 8, COUNT(*) FROM t2;
INSERT INTO t2 SELECT 9, COUNT(*) FROM t1;
INSERT INTO t1 SELECT 9, COUNT(*) FROM t2;
INSERT INTO t1 SELECT 10, COUNT(*) FROM t2;
INSERT INTO t2 SELECT 10, COUNT(*) FROM t1;
SELECT * FROM t1 ORDER BY a;
SELECT * FROM t2 ORDER BY a;
--source include/save_master_gtid.inc
--connection slave
--source include/start_slave.inc
--source include/sync_with_master_gtid.inc
SELECT * FROM t1 ORDER BY a;
SELECT * FROM t2 ORDER BY a;
--source include/stop_slave.inc
SET GLOBAL binlog_format= @old_format;
SET GLOBAL tx_isolation= @old_isolation;
--source include/start_slave.inc
--echo *** MDEV-7888: ANALYZE TABLE does wakeup_subsequent_commits(), causing wrong binlog order and parallel replication hang ***
--connection master
DROP TABLE t1, t2, t3;
CREATE TABLE t1 (a INT PRIMARY KEY, b INT) ENGINE=TokuDB;
CREATE TABLE t2 (a INT PRIMARY KEY, b INT) ENGINE=TokuDB;
CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=MyISAM;
INSERT INTO t2 VALUES (1,1), (2,1), (3,1), (4,1), (5,1);
--source include/save_master_gtid.inc
--connection slave
--source include/sync_with_master_gtid.inc
--source include/stop_slave.inc
SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL debug_dbug= '+d,inject_analyze_table_sleep';
--connection master
# The bug was that ANALYZE TABLE would call
# wakeup_subsequent_commits() too early, allowing the following
# transaction in the same group to run ahead and binlog and free the
# GCO. Then we get wrong binlog order and later access freed GCO,
# which causes lost wakeup of following GCO and thus replication hang.
# We injected a small sleep in ANALYZE to make the race easier to hit (this
# can only cause false negatives in versions with the bug, not false positives,
# so sleep is ok here. And it's in general not possible to trigger reliably
# the race with debug_sync, since the bugfix makes the race impossible).
ALTER TABLE t2 COMMENT "123abc";
ANALYZE TABLE t2;
INSERT INTO t1 VALUES (1,2);
INSERT INTO t1 VALUES (2,2);
INSERT INTO t1 VALUES (3,2);
INSERT INTO t1 VALUES (4,2);
INSERT INTO t3 VALUES (1,3);
ALTER TABLE t2 COMMENT "hello, world";
BEGIN;
INSERT INTO t1 VALUES (5,4);
INSERT INTO t1 VALUES (6,4);
INSERT INTO t1 VALUES (7,4);
INSERT INTO t1 VALUES (8,4);
INSERT INTO t1 VALUES (9,4);
INSERT INTO t1 VALUES (10,4);
INSERT INTO t1 VALUES (11,4);
INSERT INTO t1 VALUES (12,4);
INSERT INTO t1 VALUES (13,4);
INSERT INTO t1 VALUES (14,4);
INSERT INTO t1 VALUES (15,4);
INSERT INTO t1 VALUES (16,4);
INSERT INTO t1 VALUES (17,4);
INSERT INTO t1 VALUES (18,4);
INSERT INTO t1 VALUES (19,4);
INSERT INTO t1 VALUES (20,4);
COMMIT;
INSERT INTO t1 VALUES (21,5);
INSERT INTO t1 VALUES (22,5);
SELECT * FROM t1 ORDER BY a;
SELECT * FROM t2 ORDER BY a;
SELECT * FROM t3 ORDER BY a;
--source include/save_master_gtid.inc
--connection slave
--source include/start_slave.inc
--source include/sync_with_master_gtid.inc
SELECT * FROM t1 ORDER BY a;
SELECT * FROM t2 ORDER BY a;
SELECT * FROM t3 ORDER BY a;
--source include/stop_slave.inc
SET GLOBAL debug_dbug= @old_debug;
--source include/start_slave.inc
--echo *** MDEV-7929: record_gtid() for non-transactional event group calls wakeup_subsequent_commits() too early, causing slave hang. ***
--connection slave
--source include/stop_slave.inc
SET @old_dbug= @@GLOBAL.debug_dbug;
# The bug was that record_gtid(), when there is no existing transaction from
# a DML event being replicated, would commit its own transaction. This wrongly
# caused wakeup_subsequent_commits(), with similar consequences as MDEV-7888
# above. We simulate this condition with a small sleep in record_gtid() for
# a specific ANALYZE that we binlog with server id 100.
SET GLOBAL debug_dbug= '+d,inject_record_gtid_serverid_100_sleep';
--connection master
ALTER TABLE t3 COMMENT "DDL statement 1";
INSERT INTO t1 VALUES (30,0);
INSERT INTO t1 VALUES (31,0);
INSERT INTO t1 VALUES (32,0);
INSERT INTO t1 VALUES (33,0);
INSERT INTO t1 VALUES (34,0);
INSERT INTO t1 VALUES (35,0);
INSERT INTO t1 VALUES (36,0);
SET @old_server_id= @@SESSION.server_id;
SET SESSION server_id= 100;
ANALYZE TABLE t2;
SET SESSION server_id= @old_server_id;
INSERT INTO t1 VALUES (37,0);
ALTER TABLE t3 COMMENT "DDL statement 2";
INSERT INTO t1 VALUES (38,0);
INSERT INTO t1 VALUES (39,0);
ALTER TABLE t3 COMMENT "DDL statement 3";
SELECT * FROM t1 WHERE a >= 30 ORDER BY a;
--source include/save_master_gtid.inc
--connection slave
--source include/start_slave.inc
--source include/sync_with_master_gtid.inc
SELECT * FROM t1 WHERE a >= 30 ORDER BY a;
--source include/stop_slave.inc
SET GLOBAL debug_dbug= @old_debug;
--source include/start_slave.inc
--echo *** MDEV-8113: ALTER TABLE causes slave hang in optimistic parallel replication ***
--connection slave
--source include/stop_slave.inc
--connection master
ALTER TABLE t2 ADD c INT;
INSERT INTO t2 (a,b) VALUES (50, 0);
INSERT INTO t2 (a,b) VALUES (51, 1);
INSERT INTO t2 (a,b) VALUES (52, 2);
INSERT INTO t2 (a,b) VALUES (53, 3);
INSERT INTO t2 (a,b) VALUES (54, 4);
INSERT INTO t2 (a,b) VALUES (55, 5);
INSERT INTO t2 (a,b) VALUES (56, 6);
INSERT INTO t2 (a,b) VALUES (57, 7);
INSERT INTO t2 (a,b) VALUES (58, 8);
INSERT INTO t2 (a,b) VALUES (59, 9);
ALTER TABLE t2 DROP COLUMN c;
SELECT * FROM t2 WHERE a >= 50 ORDER BY a;
--source include/save_master_gtid.inc
--connection slave
--source include/start_slave.inc
--source include/sync_with_master_gtid.inc
SELECT * FROM t2 WHERE a >= 50 ORDER BY a;
# Clean up.
--connection slave
--source include/stop_slave.inc
SET GLOBAL slave_parallel_mode=@old_parallel_mode;
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
--source include/start_slave.inc
--connection master
DROP TABLE t1, t2, t3;
--source include/rpl_end.inc
......@@ -75,7 +75,9 @@ int trx_callback(
void *extra) {
uint64_t txn_id = txn->id64(txn);
uint64_t client_id = txn->get_client_id(txn);
uint64_t client_id;
void *client_extra;
txn->get_client_id(txn, &client_id, &client_extra);
uint64_t start_time = txn->get_start_time(txn);
trx_extra_t* e = reinterpret_cast<struct trx_extra_t*>(extra);
THD* thd = e->thd;
......@@ -85,7 +87,7 @@ int trx_callback(
uint64_t tnow = (uint64_t) ::time(NULL);
table->field[2]->store(tnow >= start_time ? tnow - start_time : 0, false);
int error = schema_table_store_record(thd, table);
if (!error && thd_killed(thd))
if (!error && thd_kill_level(thd))
error = ER_QUERY_INTERRUPTED;
return error;
}
......@@ -219,7 +221,7 @@ int lock_waits_callback(
int error = schema_table_store_record(thd, table);
if (!error && thd_killed(thd))
if (!error && thd_kill_level(thd))
error = ER_QUERY_INTERRUPTED;
return error;
......@@ -314,7 +316,9 @@ int locks_callback(
void* extra) {
uint64_t txn_id = txn->id64(txn);
uint64_t client_id = txn->get_client_id(txn);
uint64_t client_id;
void *client_extra;
txn->get_client_id(txn, &client_id, &client_extra);
locks_extra_t* e = reinterpret_cast<struct locks_extra_t*>(extra);
THD* thd = e->thd;
TABLE* table = e->table;
......@@ -361,7 +365,7 @@ int locks_callback(
error = schema_table_store_record(thd, table);
if (!error && thd_killed(thd))
if (!error && thd_kill_level(thd))
error = ER_QUERY_INTERRUPTED;
}
return error;
......@@ -493,7 +497,7 @@ int report_file_map(TABLE* table, THD* thd) {
error = schema_table_store_record(thd, table);
}
if (!error && thd_killed(thd))
if (!error && thd_kill_level(thd))
error = ER_QUERY_INTERRUPTED;
}
if (error == DB_NOTFOUND) {
......@@ -698,7 +702,7 @@ int report_fractal_tree_info(TABLE* table, THD* thd) {
if (error)
error = 0; // ignore read uncommitted errors
}
if (!error && thd_killed(thd))
if (!error && thd_kill_level(thd))
error = ER_QUERY_INTERRUPTED;
}
if (error == DB_NOTFOUND) {
......@@ -989,7 +993,7 @@ int report_fractal_tree_block_map(TABLE* table, THD* thd) {
table,
thd);
}
if (!error && thd_killed(thd))
if (!error && thd_kill_level(thd))
error = ER_QUERY_INTERRUPTED;
}
if (error == DB_NOTFOUND) {
......
......@@ -116,7 +116,7 @@ inline int txn_begin(
int r = env->txn_begin(env, parent, txn, flags);
if (r == 0 && thd) {
DB_TXN* this_txn = *txn;
this_txn->set_client_id(this_txn, thd_get_thread_id(thd));
this_txn->set_client_id(this_txn, thd_get_thread_id(thd), thd);
}
TOKUDB_TRACE_FOR_FLAGS(
TOKUDB_DEBUG_TXN,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment