Commit 0468430d authored by John Esmet's avatar John Esmet

refs #59 Add the locktree visualization APIs, new accessors in the DB and DB_TXN,

and a new operation in test_stress0 for stress testing coverage
parent e43b6b62
...@@ -450,11 +450,14 @@ static void print_db_env_struct (void) { ...@@ -450,11 +450,14 @@ static void print_db_env_struct (void) {
"void (*set_update) (DB_ENV *env, int (*update_function)(DB *, const DBT *key, const DBT *old_val, const DBT *extra, void (*set_val)(const DBT *new_val, void *set_extra), void *set_extra))", "void (*set_update) (DB_ENV *env, int (*update_function)(DB *, const DBT *key, const DBT *old_val, const DBT *extra, void (*set_val)(const DBT *new_val, void *set_extra), void *set_extra))",
"int (*set_lock_timeout) (DB_ENV *env, uint64_t lock_wait_time_msec)", "int (*set_lock_timeout) (DB_ENV *env, uint64_t lock_wait_time_msec)",
"int (*get_lock_timeout) (DB_ENV *env, uint64_t *lock_wait_time_msec)", "int (*get_lock_timeout) (DB_ENV *env, uint64_t *lock_wait_time_msec)",
"int (*set_lock_timeout_callback) (DB_ENV *env, lock_timeout_callback callback)",
"int (*txn_xa_recover) (DB_ENV*, TOKU_XA_XID list[/*count*/], long count, /*out*/ long *retp, uint32_t flags)", "int (*txn_xa_recover) (DB_ENV*, TOKU_XA_XID list[/*count*/], long count, /*out*/ long *retp, uint32_t flags)",
"int (*get_txn_from_xid) (DB_ENV*, /*in*/ TOKU_XA_XID *, /*out*/ DB_TXN **)", "int (*get_txn_from_xid) (DB_ENV*, /*in*/ TOKU_XA_XID *, /*out*/ DB_TXN **)",
"int (*get_cursor_for_directory) (DB_ENV*, /*in*/ DB_TXN *, /*out*/ DBC **)", "int (*get_cursor_for_directory) (DB_ENV*, /*in*/ DB_TXN *, /*out*/ DBC **)",
"int (*get_cursor_for_persistent_environment) (DB_ENV*, /*in*/ DB_TXN *, /*out*/ DBC **)", "int (*get_cursor_for_persistent_environment)(DB_ENV*, /*in*/ DB_TXN *, /*out*/ DBC **)",
"void (*change_fsync_log_period)(DB_ENV*, uint32_t)", "void (*change_fsync_log_period) (DB_ENV*, uint32_t)",
"int (*iterate_live_transactions) (DB_ENV *env, iterate_transactions_callback callback, void *extra)",
"int (*iterate_pending_lock_requests) (DB_ENV *env, iterate_requests_callback callback, void *extra)",
NULL}; NULL};
sort_and_dump_fields("db_env", true, extra); sort_and_dump_fields("db_env", true, extra);
...@@ -539,6 +542,7 @@ static void print_db_struct (void) { ...@@ -539,6 +542,7 @@ static void print_db_struct (void) {
"int (*update_broadcast)(DB *, DB_TXN*, const DBT *extra, uint32_t flags)", "int (*update_broadcast)(DB *, DB_TXN*, const DBT *extra, uint32_t flags)",
"int (*get_fractal_tree_info64)(DB*,uint64_t*,uint64_t*,uint64_t*,uint64_t*)", "int (*get_fractal_tree_info64)(DB*,uint64_t*,uint64_t*,uint64_t*,uint64_t*)",
"int (*iterate_fractal_tree_block_map)(DB*,int(*)(uint64_t,int64_t,int64_t,int64_t,int64_t,void*),void*)", "int (*iterate_fractal_tree_block_map)(DB*,int(*)(uint64_t,int64_t,int64_t,int64_t,int64_t,void*),void*)",
"const char *(*get_dname)(DB *db)",
NULL}; NULL};
sort_and_dump_fields("db", true, extra); sort_and_dump_fields("db", true, extra);
} }
...@@ -566,6 +570,8 @@ static void print_db_txn_struct (void) { ...@@ -566,6 +570,8 @@ static void print_db_txn_struct (void) {
"int (*abort_with_progress)(DB_TXN*, TXN_PROGRESS_POLL_FUNCTION, void*)", "int (*abort_with_progress)(DB_TXN*, TXN_PROGRESS_POLL_FUNCTION, void*)",
"int (*xa_prepare) (DB_TXN*, TOKU_XA_XID *)", "int (*xa_prepare) (DB_TXN*, TOKU_XA_XID *)",
"uint64_t (*id64) (DB_TXN*)", "uint64_t (*id64) (DB_TXN*)",
"void (*set_client_id)(DB_TXN *, uint64_t client_id)",
"uint64_t (*get_client_id)(DB_TXN *)",
NULL}; NULL};
sort_and_dump_fields("db_txn", false, extra); sort_and_dump_fields("db_txn", false, extra);
} }
...@@ -759,6 +765,10 @@ int main (int argc, char *const argv[] __attribute__((__unused__))) { ...@@ -759,6 +765,10 @@ int main (int argc, char *const argv[] __attribute__((__unused__))) {
printf("void toku_dbt_array_destroy_shallow(DBT_ARRAY *dbts) %s;\n", VISIBLE); printf("void toku_dbt_array_destroy_shallow(DBT_ARRAY *dbts) %s;\n", VISIBLE);
printf("void toku_dbt_array_resize(DBT_ARRAY *dbts, uint32_t size) %s;\n", VISIBLE); printf("void toku_dbt_array_resize(DBT_ARRAY *dbts, uint32_t size) %s;\n", VISIBLE);
printf("typedef void (*lock_timeout_callback)(DB *db, uint64_t requesting_txnid, const DBT *left_key, const DBT *right_key, uint64_t blocking_txnid);\n");
printf("typedef int (*iterate_row_locks_callback)(DB **db, DBT *left_key, DBT *right_key, void *extra);\n");
printf("typedef int (*iterate_transactions_callback)(uint64_t txnid, uint64_t client_id, iterate_row_locks_callback cb, void *locks_extra, void *extra);\n");
printf("typedef int (*iterate_requests_callback)(DB *db, uint64_t requesting_txnid, const DBT *left_key, const DBT *right_key, uint64_t blocking_txnid, uint64_t start_time, void *extra);\n");
print_db_env_struct(); print_db_env_struct();
print_db_key_range_struct(); print_db_key_range_struct();
print_db_lsn_struct(); print_db_lsn_struct();
......
...@@ -284,6 +284,7 @@ struct tokutxn { ...@@ -284,6 +284,7 @@ struct tokutxn {
TOKUTXN_STATE state; TOKUTXN_STATE state;
uint32_t num_pin; // number of threads (all hot indexes) that want this uint32_t num_pin; // number of threads (all hot indexes) that want this
// txn to not transition to commit or abort // txn to not transition to commit or abort
uint64_t client_id;
}; };
static inline int static inline int
......
...@@ -346,7 +346,8 @@ static txn_child_manager tcm; ...@@ -346,7 +346,8 @@ static txn_child_manager tcm;
.state_lock = ZERO_MUTEX_INITIALIZER, .state_lock = ZERO_MUTEX_INITIALIZER,
.state_cond = ZERO_COND_INITIALIZER, .state_cond = ZERO_COND_INITIALIZER,
.state = TOKUTXN_LIVE, .state = TOKUTXN_LIVE,
.num_pin = 0 .num_pin = 0,
.client_id = 0,
}; };
TOKUTXN result = NULL; TOKUTXN result = NULL;
...@@ -790,10 +791,17 @@ bool toku_txn_has_spilled_rollback(TOKUTXN txn) { ...@@ -790,10 +791,17 @@ bool toku_txn_has_spilled_rollback(TOKUTXN txn) {
return txn_has_spilled_rollback_logs(txn); return txn_has_spilled_rollback_logs(txn);
} }
uint64_t toku_txn_get_client_id(TOKUTXN txn) {
return txn->client_id;
}
void toku_txn_set_client_id(TOKUTXN txn, uint64_t client_id) {
txn->client_id = client_id;
}
#include <toku_race_tools.h> #include <toku_race_tools.h>
void __attribute__((__constructor__)) toku_txn_status_helgrind_ignore(void); void __attribute__((__constructor__)) toku_txn_status_helgrind_ignore(void);
void void toku_txn_status_helgrind_ignore(void) {
toku_txn_status_helgrind_ignore(void) {
TOKU_VALGRIND_HG_DISABLE_CHECKING(&txn_status, sizeof txn_status); TOKU_VALGRIND_HG_DISABLE_CHECKING(&txn_status, sizeof txn_status);
} }
......
...@@ -218,4 +218,7 @@ void toku_txn_unpin_live_txn(TOKUTXN txn); ...@@ -218,4 +218,7 @@ void toku_txn_unpin_live_txn(TOKUTXN txn);
bool toku_txn_has_spilled_rollback(TOKUTXN txn); bool toku_txn_has_spilled_rollback(TOKUTXN txn);
uint64_t toku_txn_get_client_id(TOKUTXN txn);
void toku_txn_set_client_id(TOKUTXN txn, uint64_t client_id);
#endif //TOKUTXN_H #endif //TOKUTXN_H
...@@ -100,6 +100,8 @@ namespace toku { ...@@ -100,6 +100,8 @@ namespace toku {
// initialize a lock request's internals // initialize a lock request's internals
void lock_request::create(uint64_t wait_time) { void lock_request::create(uint64_t wait_time) {
m_txnid = TXNID_NONE; m_txnid = TXNID_NONE;
m_conflicting_txnid = TXNID_NONE;
m_start_time = 0;
m_left_key = nullptr; m_left_key = nullptr;
m_right_key = nullptr; m_right_key = nullptr;
toku_init_dbt(&m_left_key_copy); toku_init_dbt(&m_left_key_copy);
...@@ -215,8 +217,10 @@ int lock_request::start(void) { ...@@ -215,8 +217,10 @@ int lock_request::start(void) {
// and check for a deadlock. if there is one, complete it as failed // and check for a deadlock. if there is one, complete it as failed
if (r == DB_LOCK_NOTGRANTED) { if (r == DB_LOCK_NOTGRANTED) {
copy_keys(); copy_keys();
toku_mutex_lock(&m_info->mutex);
m_state = state::PENDING; m_state = state::PENDING;
m_start_time = toku_current_time_microsec() / 1000;
m_conflicting_txnid = conflicts.get(0);
toku_mutex_lock(&m_info->mutex);
insert_into_lock_requests(); insert_into_lock_requests();
if (deadlock_exists(conflicts)) { if (deadlock_exists(conflicts)) {
remove_from_lock_requests(); remove_from_lock_requests();
...@@ -289,6 +293,18 @@ const DBT *lock_request::get_right_key(void) const { ...@@ -289,6 +293,18 @@ const DBT *lock_request::get_right_key(void) const {
return m_right_key; return m_right_key;
} }
TXNID lock_request::get_txnid(void) const {
return m_txnid;
}
uint64_t lock_request::get_start_time(void) const {
return m_start_time;
}
TXNID lock_request::get_conflicting_txnid(void) const {
return m_conflicting_txnid;
}
int lock_request::retry(void) { int lock_request::retry(void) {
int r; int r;
......
...@@ -145,10 +145,21 @@ public: ...@@ -145,10 +145,21 @@ public:
// or simply DB_LOCK_NOTGRANTED if the wait time expired. // or simply DB_LOCK_NOTGRANTED if the wait time expired.
int wait(void); int wait(void);
// return: left end-point of the lock range
const DBT *get_left_key(void) const; const DBT *get_left_key(void) const;
// return: right end-point of the lock range
const DBT *get_right_key(void) const; const DBT *get_right_key(void) const;
// return: the txnid waiting for a lock
TXNID get_txnid(void) const;
// return: when this lock request started, as milliseconds from epoch
uint64_t get_start_time(void) const;
// return: which txnid is blocking this request (there may be more, though)
TXNID get_conflicting_txnid(void) const;
// effect: Retries all of the lock requests for the given locktree. // effect: Retries all of the lock requests for the given locktree.
// Any lock requests successfully restarted is completed and woken up. // Any lock requests successfully restarted is completed and woken up.
// The rest remain pending. // The rest remain pending.
...@@ -168,6 +179,8 @@ private: ...@@ -168,6 +179,8 @@ private:
// copies these keys and stores them in m_left_key_copy etc and // copies these keys and stores them in m_left_key_copy etc and
// sets the temporary pointers to null. // sets the temporary pointers to null.
TXNID m_txnid; TXNID m_txnid;
TXNID m_conflicting_txnid;
uint64_t m_start_time;
const DBT *m_left_key; const DBT *m_left_key;
const DBT *m_right_key; const DBT *m_right_key;
DBT m_left_key_copy; DBT m_left_key_copy;
......
...@@ -770,4 +770,8 @@ int locktree::compare(const locktree *lt) { ...@@ -770,4 +770,8 @@ int locktree::compare(const locktree *lt) {
} }
} }
DICTIONARY_ID locktree::get_dict_id() const {
return m_dict_id;
}
} /* namespace toku */ } /* namespace toku */
...@@ -186,6 +186,8 @@ public: ...@@ -186,6 +186,8 @@ public:
int compare(const locktree *lt); int compare(const locktree *lt);
DICTIONARY_ID get_dict_id() const;
struct lt_counters { struct lt_counters {
uint64_t wait_count, wait_time; uint64_t wait_count, wait_time;
uint64_t long_wait_count, long_wait_time; uint64_t long_wait_count, long_wait_time;
...@@ -293,6 +295,17 @@ public: ...@@ -293,6 +295,17 @@ public:
void get_status(LTM_STATUS status); void get_status(LTM_STATUS status);
// effect: calls the iterate function on each pending lock request
// note: holds the manager's mutex
typedef int (*lock_request_iterate_callback)(DICTIONARY_ID dict_id,
TXNID txnid,
const DBT *left_key,
const DBT *right_key,
TXNID blocking_txnid,
uint64_t start_time,
void *extra);
int iterate_pending_lock_requests(lock_request_iterate_callback cb, void *extra);
private: private:
static const uint64_t DEFAULT_MAX_LOCK_MEMORY = 64L * 1024 * 1024; static const uint64_t DEFAULT_MAX_LOCK_MEMORY = 64L * 1024 * 1024;
static const uint64_t DEFAULT_LOCK_WAIT_TIME = 0; static const uint64_t DEFAULT_LOCK_WAIT_TIME = 0;
......
...@@ -93,6 +93,8 @@ PATENT RIGHTS GRANT: ...@@ -93,6 +93,8 @@ PATENT RIGHTS GRANT:
#include <portability/toku_pthread.h> #include <portability/toku_pthread.h>
#include "locktree.h" #include "locktree.h"
#include "lock_request.h"
#include <util/status.h> #include <util/status.h>
namespace toku { namespace toku {
...@@ -373,6 +375,35 @@ bool locktree::manager::memory_tracker::out_of_locks(void) const { ...@@ -373,6 +375,35 @@ bool locktree::manager::memory_tracker::out_of_locks(void) const {
return m_mgr->m_current_lock_memory >= m_mgr->m_max_lock_memory; return m_mgr->m_current_lock_memory >= m_mgr->m_max_lock_memory;
} }
int locktree::manager::iterate_pending_lock_requests(
lock_request_iterate_callback callback, void *extra) {
mutex_lock();
int r = 0;
size_t num_locktrees = m_locktree_map.size();
for (size_t i = 0; i < num_locktrees && r == 0; i++) {
locktree *lt;
r = m_locktree_map.fetch(i, &lt);
invariant_zero(r);
struct lt_lock_request_info *info = &lt->m_lock_request_info;
toku_mutex_lock(&info->mutex);
size_t num_requests = info->pending_lock_requests.size();
for (size_t k = 0; k < num_requests && r == 0; k++) {
lock_request *req;
r = info->pending_lock_requests.fetch(k, &req);
invariant_zero(r);
r = callback(lt->m_dict_id, req->get_txnid(),
req->get_left_key(), req->get_right_key(),
req->get_conflicting_txnid(), req->get_start_time(), extra);
}
toku_mutex_unlock(&info->mutex);
}
mutex_unlock();
return r;
}
#define STATUS_INIT(k,c,t,l,inc) TOKUDB_STATUS_INIT(status, k, c, t, "locktree: " l, inc) #define STATUS_INIT(k,c,t,l,inc) TOKUDB_STATUS_INIT(status, k, c, t, "locktree: " l, inc)
void locktree::manager::status_init(void) { void locktree::manager::status_init(void) {
...@@ -452,5 +483,4 @@ void locktree::manager::get_status(LTM_STATUS statp) { ...@@ -452,5 +483,4 @@ void locktree::manager::get_status(LTM_STATUS statp) {
} }
#undef STATUS_VALUE #undef STATUS_VALUE
} /* namespace toku */ } /* namespace toku */
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id$"
/*
COPYING CONDITIONS NOTICE:
This program is free software; you can redistribute it and/or modify
it under the terms of version 2 of the GNU General Public License as
published by the Free Software Foundation, and provided that the
following conditions are met:
* Redistributions of source code must retain this COPYING
CONDITIONS NOTICE, the COPYRIGHT NOTICE (below), the
DISCLAIMER (below), the UNIVERSITY PATENT NOTICE (below), the
PATENT MARKING NOTICE (below), and the PATENT RIGHTS
GRANT (below).
* Redistributions in binary form must reproduce this COPYING
CONDITIONS NOTICE, the COPYRIGHT NOTICE (below), the
DISCLAIMER (below), the UNIVERSITY PATENT NOTICE (below), the
PATENT MARKING NOTICE (below), and the PATENT RIGHTS
GRANT (below) in the documentation and/or other materials
provided with the distribution.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
02110-1301, USA.
COPYRIGHT NOTICE:
TokuDB, Tokutek Fractal Tree Indexing Library.
Copyright (C) 2007-2013 Tokutek, Inc.
DISCLAIMER:
This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
UNIVERSITY PATENT NOTICE:
The technology is licensed by the Massachusetts Institute of
Technology, Rutgers State University of New Jersey, and the Research
Foundation of State University of New York at Stony Brook under
United States of America Serial No. 11/760379 and to the patents
and/or patent applications resulting from it.
PATENT MARKING NOTICE:
This software is covered by US Patent No. 8,185,551.
PATENT RIGHTS GRANT:
"THIS IMPLEMENTATION" means the copyrightable works distributed by
Tokutek as part of the Fractal Tree project.
"PATENT CLAIMS" means the claims of patents that are owned or
licensable by Tokutek, both currently or in the future; and that in
the absence of this license would be infringed by THIS
IMPLEMENTATION or by using or running THIS IMPLEMENTATION.
"PATENT CHALLENGE" shall mean a challenge to the validity,
patentability, enforceability and/or non-infringement of any of the
PATENT CLAIMS or otherwise opposing any of the PATENT CLAIMS.
Tokutek hereby grants to you, for the term and geographical scope of
the PATENT CLAIMS, a non-exclusive, no-charge, royalty-free,
irrevocable (except as stated in this section) patent license to
make, have made, use, offer to sell, sell, import, transfer, and
otherwise run, modify, and propagate the contents of THIS
IMPLEMENTATION, where such license applies only to the PATENT
CLAIMS. This grant does not include claims that would be infringed
only as a consequence of further modifications of THIS
IMPLEMENTATION. If you or your agent or licensee institute or order
or agree to the institution of patent litigation against any entity
(including a cross-claim or counterclaim in a lawsuit) alleging that
THIS IMPLEMENTATION constitutes direct or contributory patent
infringement, or inducement of patent infringement, then any rights
granted to you under this License shall terminate as of the date
such litigation is filed. If you or your agent or exclusive
licensee institute or order or agree to the institution of a PATENT
CHALLENGE, then Tokutek may terminate any rights granted to you
under this License.
*/
#ident "Copyright (c) 2007-2013 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "test.h"
static DB_TXN *txn1, *txn2, *txn3;
struct iterate_extra {
iterate_extra() : n(0) {
visited_txn[0] = false;
visited_txn[1] = false;
visited_txn[2] = false;
}
int n;
bool visited_txn[3];
};
static int iterate_callback(uint64_t txnid, uint64_t client_id,
iterate_row_locks_callback iterate_locks,
void *locks_extra, void *extra) {
iterate_extra *info = reinterpret_cast<iterate_extra *>(extra);
DB *db;
DBT left_key, right_key;
int r = iterate_locks(&db, &left_key, &right_key, locks_extra);
invariant(r == DB_NOTFOUND);
if (txnid == txn1->id64(txn1)) {
assert(!info->visited_txn[0]);
invariant(client_id == 0);
info->visited_txn[0] = true;
} else if (txnid == txn2->id64(txn2)) {
assert(!info->visited_txn[1]);
invariant(client_id == 1);
info->visited_txn[1] = true;
} else if (txnid == txn3->id64(txn3)) {
assert(!info->visited_txn[2]);
invariant(client_id == 2);
info->visited_txn[2] = true;
}
info->n++;
return 0;
}
int test_main(int UU(argc), char *const UU(argv[])) {
int r;
const int env_flags = DB_INIT_MPOOL | DB_CREATE | DB_THREAD |
DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_TXN | DB_PRIVATE;
toku_os_recursive_delete(TOKU_TEST_FILENAME);
r = toku_os_mkdir(TOKU_TEST_FILENAME, 0755); CKERR(r);
DB_ENV *env;
r = db_env_create(&env, 0); CKERR(r);
r = env->iterate_live_transactions(env, iterate_callback, NULL);
assert(r == EINVAL);
r = env->open(env, TOKU_TEST_FILENAME, env_flags, 0755); CKERR(r);
r = env->txn_begin(env, NULL, &txn1, 0); CKERR(r);
txn1->set_client_id(txn1, 0);
r = env->txn_begin(env, NULL, &txn2, 0); CKERR(r);
txn2->set_client_id(txn2, 1);
r = env->txn_begin(env, NULL, &txn3, 0); CKERR(r);
txn3->set_client_id(txn3, 2);
{
iterate_extra e;
r = env->iterate_live_transactions(env, iterate_callback, &e); CKERR(r);
assert(e.visited_txn[0]);
assert(e.visited_txn[1]);
assert(e.visited_txn[2]);
assert(e.n == 3);
}
r = txn1->commit(txn1, 0); CKERR(r);
r = txn2->abort(txn2); CKERR(r);
{
iterate_extra e;
r = env->iterate_live_transactions(env, iterate_callback, &e); CKERR(r);
assert(!e.visited_txn[0]);
assert(!e.visited_txn[1]);
assert(e.visited_txn[2]);
assert(e.n == 1);
}
r = txn3->commit(txn3, 0); CKERR(r);
{
iterate_extra e;
r = env->iterate_live_transactions(env, iterate_callback, &e); CKERR(r);
assert(!e.visited_txn[0]);
assert(!e.visited_txn[1]);
assert(!e.visited_txn[2]);
assert(e.n == 0);
}
r = env->close(env, 0); CKERR(r);
return 0;
}
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id$"
/*
COPYING CONDITIONS NOTICE:
This program is free software; you can redistribute it and/or modify
it under the terms of version 2 of the GNU General Public License as
published by the Free Software Foundation, and provided that the
following conditions are met:
* Redistributions of source code must retain this COPYING
CONDITIONS NOTICE, the COPYRIGHT NOTICE (below), the
DISCLAIMER (below), the UNIVERSITY PATENT NOTICE (below), the
PATENT MARKING NOTICE (below), and the PATENT RIGHTS
GRANT (below).
* Redistributions in binary form must reproduce this COPYING
CONDITIONS NOTICE, the COPYRIGHT NOTICE (below), the
DISCLAIMER (below), the UNIVERSITY PATENT NOTICE (below), the
PATENT MARKING NOTICE (below), and the PATENT RIGHTS
GRANT (below) in the documentation and/or other materials
provided with the distribution.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
02110-1301, USA.
COPYRIGHT NOTICE:
TokuDB, Tokutek Fractal Tree Indexing Library.
Copyright (C) 2007-2013 Tokutek, Inc.
DISCLAIMER:
This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
UNIVERSITY PATENT NOTICE:
The technology is licensed by the Massachusetts Institute of
Technology, Rutgers State University of New Jersey, and the Research
Foundation of State University of New York at Stony Brook under
United States of America Serial No. 11/760379 and to the patents
and/or patent applications resulting from it.
PATENT MARKING NOTICE:
This software is covered by US Patent No. 8,185,551.
PATENT RIGHTS GRANT:
"THIS IMPLEMENTATION" means the copyrightable works distributed by
Tokutek as part of the Fractal Tree project.
"PATENT CLAIMS" means the claims of patents that are owned or
licensable by Tokutek, both currently or in the future; and that in
the absence of this license would be infringed by THIS
IMPLEMENTATION or by using or running THIS IMPLEMENTATION.
"PATENT CHALLENGE" shall mean a challenge to the validity,
patentability, enforceability and/or non-infringement of any of the
PATENT CLAIMS or otherwise opposing any of the PATENT CLAIMS.
Tokutek hereby grants to you, for the term and geographical scope of
the PATENT CLAIMS, a non-exclusive, no-charge, royalty-free,
irrevocable (except as stated in this section) patent license to
make, have made, use, offer to sell, sell, import, transfer, and
otherwise run, modify, and propagate the contents of THIS
IMPLEMENTATION, where such license applies only to the PATENT
CLAIMS. This grant does not include claims that would be infringed
only as a consequence of further modifications of THIS
IMPLEMENTATION. If you or your agent or licensee institute or order
or agree to the institution of patent litigation against any entity
(including a cross-claim or counterclaim in a lawsuit) alleging that
THIS IMPLEMENTATION constitutes direct or contributory patent
infringement, or inducement of patent infringement, then any rights
granted to you under this License shall terminate as of the date
such litigation is filed. If you or your agent or exclusive
licensee institute or order or agree to the institution of a PATENT
CHALLENGE, then Tokutek may terminate any rights granted to you
under this License.
*/
#ident "Copyright (c) 2007-2013 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "test.h"
#include <portability/toku_pthread.h>
static DB_ENV *env;
static DB *db;
static DB_TXN *txn1, *txn2, *txn3;
static const char *dname = "iterate_pending_requests_dname";
static const int magic_key = 100;
static int iterate_callback_called;
toku_pthread_t thread1, thread2;
// Verify the state of the world
static int iterate_callback(DB *_db, uint64_t requesting_txnid,
const DBT *left_key, const DBT *right_key,
uint64_t blocking_txnid, uint64_t start_time, void *extra) {
iterate_callback_called++;
invariant(extra == nullptr);
invariant(strcmp(_db->get_dname(_db), db->get_dname(db)) == 0);
invariant(start_time > 0);
invariant(*reinterpret_cast<int *>(left_key->data) == magic_key);
invariant(*reinterpret_cast<int *>(right_key->data) == magic_key);
invariant(blocking_txnid == txn1->id64(txn1));
invariant(requesting_txnid == txn2->id64(txn2) || requesting_txnid == txn3->id64(txn3));
return 0;
}
static void acquire_lock(DB_TXN *txn, int key) {
int val = 0;
DBT k, v;
dbt_init(&k, &key, sizeof(int));
dbt_init(&v, &val, sizeof(int));
(void) db->put(db, txn, &k, &v, 0);
}
struct acquire_lock_extra {
acquire_lock_extra(DB_TXN *x, int k) :
txn(x), key(k) {
}
DB_TXN *txn;
int key;
};
static void *acquire_lock_thread(void *arg) {
acquire_lock_extra *info = reinterpret_cast<acquire_lock_extra *>(arg);
acquire_lock(info->txn, info->key);
return NULL;
}
int test_main(int UU(argc), char *const UU(argv[])) {
int r;
const int env_flags = DB_INIT_MPOOL | DB_CREATE | DB_THREAD |
DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_TXN | DB_PRIVATE;
toku_os_recursive_delete(TOKU_TEST_FILENAME);
r = toku_os_mkdir(TOKU_TEST_FILENAME, 0755); CKERR(r);
r = db_env_create(&env, 0); CKERR(r);
r = env->open(env, TOKU_TEST_FILENAME, env_flags, 0755); CKERR(r);
r = env->set_lock_timeout(env, 2000);
r = db_create(&db, env, 0); CKERR(r);
r = db->open(db, NULL, dname, NULL, DB_BTREE, DB_CREATE, 0777); CKERR(r);
r = env->txn_begin(env, NULL, &txn1, DB_SERIALIZABLE); CKERR(r);
r = env->txn_begin(env, NULL, &txn2, DB_SERIALIZABLE); CKERR(r);
r = env->txn_begin(env, NULL, &txn3, DB_SERIALIZABLE); CKERR(r);
// Extremely simple test. Get lock [0, 0] on txn1, then asynchronously
// attempt to get that lock in txn2 and txn3. The iterate callback
// verifies that two waiters exist for [0, 0] and that txn1 is
// the blocking txn.
acquire_lock(txn1, magic_key);
acquire_lock_extra e1(txn2, magic_key);
r = toku_pthread_create(&thread1, NULL, acquire_lock_thread, &e1); CKERR(r);
acquire_lock_extra e2(txn3, magic_key);
r = toku_pthread_create(&thread2, NULL, acquire_lock_thread, &e2); CKERR(r);
usleep(100000);
r = env->iterate_pending_lock_requests(env, iterate_callback, NULL); CKERR(r);
invariant(iterate_callback_called == 2);
void *v;
r = toku_pthread_join(thread1, &v); CKERR(r);
r = toku_pthread_join(thread2, &v); CKERR(r);
r = txn1->commit(txn1, 0); CKERR(r);
r = txn2->commit(txn2, 0); CKERR(r);
r = txn3->commit(txn3, 0); CKERR(r);
r = db->close(db, 0); CKERR(r);
r = env->close(env, 0); CKERR(r);
return 0;
}
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id$"
/*
COPYING CONDITIONS NOTICE:
This program is free software; you can redistribute it and/or modify
it under the terms of version 2 of the GNU General Public License as
published by the Free Software Foundation, and provided that the
following conditions are met:
* Redistributions of source code must retain this COPYING
CONDITIONS NOTICE, the COPYRIGHT NOTICE (below), the
DISCLAIMER (below), the UNIVERSITY PATENT NOTICE (below), the
PATENT MARKING NOTICE (below), and the PATENT RIGHTS
GRANT (below).
* Redistributions in binary form must reproduce this COPYING
CONDITIONS NOTICE, the COPYRIGHT NOTICE (below), the
DISCLAIMER (below), the UNIVERSITY PATENT NOTICE (below), the
PATENT MARKING NOTICE (below), and the PATENT RIGHTS
GRANT (below) in the documentation and/or other materials
provided with the distribution.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
02110-1301, USA.
COPYRIGHT NOTICE:
TokuDB, Tokutek Fractal Tree Indexing Library.
Copyright (C) 2007-2013 Tokutek, Inc.
DISCLAIMER:
This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
UNIVERSITY PATENT NOTICE:
The technology is licensed by the Massachusetts Institute of
Technology, Rutgers State University of New Jersey, and the Research
Foundation of State University of New York at Stony Brook under
United States of America Serial No. 11/760379 and to the patents
and/or patent applications resulting from it.
PATENT MARKING NOTICE:
This software is covered by US Patent No. 8,185,551.
PATENT RIGHTS GRANT:
"THIS IMPLEMENTATION" means the copyrightable works distributed by
Tokutek as part of the Fractal Tree project.
"PATENT CLAIMS" means the claims of patents that are owned or
licensable by Tokutek, both currently or in the future; and that in
the absence of this license would be infringed by THIS
IMPLEMENTATION or by using or running THIS IMPLEMENTATION.
"PATENT CHALLENGE" shall mean a challenge to the validity,
patentability, enforceability and/or non-infringement of any of the
PATENT CLAIMS or otherwise opposing any of the PATENT CLAIMS.
Tokutek hereby grants to you, for the term and geographical scope of
the PATENT CLAIMS, a non-exclusive, no-charge, royalty-free,
irrevocable (except as stated in this section) patent license to
make, have made, use, offer to sell, sell, import, transfer, and
otherwise run, modify, and propagate the contents of THIS
IMPLEMENTATION, where such license applies only to the PATENT
CLAIMS. This grant does not include claims that would be infringed
only as a consequence of further modifications of THIS
IMPLEMENTATION. If you or your agent or licensee institute or order
or agree to the institution of patent litigation against any entity
(including a cross-claim or counterclaim in a lawsuit) alleging that
THIS IMPLEMENTATION constitutes direct or contributory patent
infringement, or inducement of patent infringement, then any rights
granted to you under this License shall terminate as of the date
such litigation is filed. If you or your agent or exclusive
licensee institute or order or agree to the institution of a PATENT
CHALLENGE, then Tokutek may terminate any rights granted to you
under this License.
*/
#ident "Copyright (c) 2007-2013 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "test.h"
#include <portability/toku_pthread.h>
#include <portability/toku_atomic.h>
static DB_ENV *env;
static DB *db;
static DB_TXN *txn1, *txn2;
static const int magic_key = 100;
static bool callback_calls;
toku_pthread_t thread1;
static void lock_not_granted(DB *_db, uint64_t requesting_txnid,
const DBT *left_key, const DBT *right_key,
uint64_t blocking_txnid) {
toku_sync_fetch_and_add(&callback_calls, 1);
invariant(strcmp(_db->get_dname(_db), db->get_dname(db)) == 0);
if (requesting_txnid == txn2->id64(txn2)) {
invariant(blocking_txnid == txn1->id64(txn1));
invariant(*reinterpret_cast<int *>(left_key->data) == magic_key);
invariant(*reinterpret_cast<int *>(right_key->data) == magic_key);
} else {
invariant(blocking_txnid == txn2->id64(txn2));
invariant(*reinterpret_cast<int *>(left_key->data) == magic_key + 1);
invariant(*reinterpret_cast<int *>(right_key->data) == magic_key + 1);
}
}
static void acquire_lock(DB_TXN *txn, int key) {
int val = 0;
DBT k, v;
dbt_init(&k, &key, sizeof(int));
dbt_init(&v, &val, sizeof(int));
(void) db->put(db, txn, &k, &v, 0);
}
struct acquire_lock_extra {
acquire_lock_extra(DB_TXN *x, int k) :
txn(x), key(k) {
}
DB_TXN *txn;
int key;
};
static void *acquire_lock_thread(void *arg) {
acquire_lock_extra *info = reinterpret_cast<acquire_lock_extra *>(arg);
acquire_lock(info->txn, info->key);
return NULL;
}
int test_main(int UU(argc), char *const UU(argv[])) {
int r;
const int env_flags = DB_INIT_MPOOL | DB_CREATE | DB_THREAD |
DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_TXN | DB_PRIVATE;
toku_os_recursive_delete(TOKU_TEST_FILENAME);
r = toku_os_mkdir(TOKU_TEST_FILENAME, 0755); CKERR(r);
r = db_env_create(&env, 0); CKERR(r);
r = env->open(env, TOKU_TEST_FILENAME, env_flags, 0755); CKERR(r);
r = env->set_lock_timeout(env, 1000);
r = env->set_lock_timeout_callback(env, lock_not_granted);
r = db_create(&db, env, 0); CKERR(r);
r = db->open(db, NULL, "test", NULL, DB_BTREE, DB_CREATE, 0777); CKERR(r);
r = env->txn_begin(env, NULL, &txn1, DB_SERIALIZABLE); CKERR(r);
r = env->txn_begin(env, NULL, &txn2, DB_SERIALIZABLE); CKERR(r);
// Extremely simple test. Get lock [0, 0] on txn1, then asynchronously
// attempt to get that lock in txn2. The timouet callback should get called.
acquire_lock(txn1, magic_key);
invariant(callback_calls == 0);
acquire_lock(txn2, magic_key);
invariant(callback_calls == 1);
// If we enduce a deadlock, the callback should get called.
acquire_lock(txn2, magic_key + 1);
toku_pthread_t thread;
acquire_lock_extra e(txn1, magic_key + 1);
r = toku_pthread_create(&thread, NULL, acquire_lock_thread, &e);
usleep(100000);
acquire_lock(txn2, magic_key);
invariant(callback_calls == 2);
void *v;
r = toku_pthread_join(thread, &v); CKERR(r);
invariant(callback_calls == 3);
// If we set the callback to null, then it shouldn't get called anymore.
env->set_lock_timeout_callback(env, nullptr);
acquire_lock(txn2, magic_key);
invariant(callback_calls == 3);
r = txn1->commit(txn1, 0); CKERR(r);
r = txn2->commit(txn2, 0); CKERR(r);
r = db->close(db, 0); CKERR(r);
r = env->close(env, 0); CKERR(r);
return 0;
}
...@@ -114,11 +114,66 @@ static int UU() lock_escalation_op(DB_TXN *UU(txn), ARG arg, void* operation_ext ...@@ -114,11 +114,66 @@ static int UU() lock_escalation_op(DB_TXN *UU(txn), ARG arg, void* operation_ext
return 0; return 0;
} }
static int iterate_requests(DB *db, uint64_t txnid,
const DBT *left_key, const DBT *right_key,
uint64_t blocking_txnid,
uint64_t UU(start_time),
void *extra) {
invariant_null(extra);
invariant(db != nullptr);
invariant(txnid > 0);
invariant(left_key != nullptr);
invariant(right_key != nullptr);
invariant(blocking_txnid > 0);
invariant(txnid != blocking_txnid);
if (rand() % 5 == 0) {
usleep(100);
}
return 0;
}
static int UU() iterate_pending_lock_requests_op(DB_TXN *UU(txn), ARG arg, void *UU(operation_extra), void *UU(stats_extra)) {
DB_ENV *env = arg->env;
int r = env->iterate_pending_lock_requests(env, iterate_requests, nullptr);
invariant_zero(r);
return r;
}
static int iterate_txns(uint64_t txnid, uint64_t client_id,
iterate_row_locks_callback iterate_locks,
void *locks_extra, void *extra) {
invariant_null(extra);
invariant(txnid > 0);
invariant(client_id == 0);
DB *db;
DBT left_key, right_key;
while (iterate_locks(&db, &left_key, &right_key, locks_extra) == 0) {
invariant_notnull(db);
invariant_notnull(left_key.data);
invariant(left_key.size > 0);
invariant_notnull(right_key.data);
invariant(right_key.size > 0);
if (rand() % 5 == 0) {
usleep(50);
}
memset(&left_key, 0, sizeof(DBT));
memset(&right_key, 0, sizeof(DBT));
}
return 0;
}
static int UU() iterate_live_transactions_op(DB_TXN *UU(txn), ARG arg, void *UU(operation_extra), void *UU(stats_extra)) {
DB_ENV *env = arg->env;
int r = env->iterate_live_transactions(env, iterate_txns, nullptr);
invariant_zero(r);
return r;
}
static void static void
stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
if (verbose) printf("starting creation of pthreads\n"); if (verbose) printf("starting creation of pthreads\n");
const int non_update_threads = 2; const int non_update_threads = 4;
const int num_threads = non_update_threads + cli_args->num_update_threads; const int num_threads = non_update_threads + cli_args->num_update_threads;
struct arg myargs[num_threads]; struct arg myargs[num_threads];
for (int i = 0; i < num_threads; i++) { for (int i = 0; i < num_threads; i++) {
...@@ -137,6 +192,14 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { ...@@ -137,6 +192,14 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
myargs[1].operation_extra = nullptr; myargs[1].operation_extra = nullptr;
myargs[1].operation = lock_escalation_op; myargs[1].operation = lock_escalation_op;
myargs[2].sleep_ms = 1L * 1000;
myargs[2].operation_extra = nullptr;
myargs[2].operation = iterate_pending_lock_requests_op;
myargs[3].sleep_ms = 1L * 1000;
myargs[3].operation_extra = nullptr;
myargs[3].operation = iterate_live_transactions_op;
// make the threads that update the db // make the threads that update the db
struct update_op_args uoe = get_update_op_args(cli_args, NULL); struct update_op_args uoe = get_update_op_args(cli_args, NULL);
for (int i = non_update_threads; i < num_threads; ++i) { for (int i = non_update_threads; i < num_threads; ++i) {
......
...@@ -152,12 +152,14 @@ struct __toku_db_env_internal { ...@@ -152,12 +152,14 @@ struct __toku_db_env_internal {
CACHETABLE cachetable; CACHETABLE cachetable;
TOKULOGGER logger; TOKULOGGER logger;
toku::locktree::manager ltm; toku::locktree::manager ltm;
lock_timeout_callback lock_wait_timeout_callback; // Called when a lock request times out waiting for a lock.
DB *directory; // Maps dnames to inames DB *directory; // Maps dnames to inames
DB *persistent_environment; // Stores environment settings, can be used for upgrade DB *persistent_environment; // Stores environment settings, can be used for upgrade
// TODO: toku::omt<DB *> // TODO: toku::omt<DB *>
OMT open_dbs; // Stores open db handles, sorted first by dname and then by numerical value of pointer to the db (arbitrarily assigned memory location) OMT open_dbs_by_dname; // Stores open db handles, sorted first by dname and then by numerical value of pointer to the db (arbitrarily assigned memory location)
toku_mutex_t open_dbs_lock; // lock that protects the OMT of open dbs. OMT open_dbs_by_dict_id; // Stores open db handles, sorted by dictionary id and then by numerical value of pointer to the db (arbitrarily assigned memory location)
toku_pthread_rwlock_t open_dbs_rwlock; // rwlock that protects the OMT of open dbs.
char *real_data_dir; // data dir used when the env is opened (relative to cwd, or absolute with leading /) char *real_data_dir; // data dir used when the env is opened (relative to cwd, or absolute with leading /)
char *real_log_dir; // log dir used when the env is opened (relative to cwd, or absolute with leading /) char *real_log_dir; // log dir used when the env is opened (relative to cwd, or absolute with leading /)
......
...@@ -1114,8 +1114,8 @@ env_close(DB_ENV * env, uint32_t flags) { ...@@ -1114,8 +1114,8 @@ env_close(DB_ENV * env, uint32_t flags) {
r = toku_ydb_do_error(env, EINVAL, "%s", err_msg); r = toku_ydb_do_error(env, EINVAL, "%s", err_msg);
goto panic_and_quit_early; goto panic_and_quit_early;
} }
if (env->i->open_dbs) { //Verify that there are no open dbs. if (env->i->open_dbs_by_dname) { //Verify that there are no open dbs.
if (toku_omt_size(env->i->open_dbs) > 0) { if (toku_omt_size(env->i->open_dbs_by_dname) > 0) {
err_msg = "Cannot close environment due to open DBs\n"; err_msg = "Cannot close environment due to open DBs\n";
r = toku_ydb_do_error(env, EINVAL, "%s", err_msg); r = toku_ydb_do_error(env, EINVAL, "%s", err_msg);
goto panic_and_quit_early; goto panic_and_quit_early;
...@@ -1191,11 +1191,13 @@ env_close(DB_ENV * env, uint32_t flags) { ...@@ -1191,11 +1191,13 @@ env_close(DB_ENV * env, uint32_t flags) {
toku_free(env->i->real_log_dir); toku_free(env->i->real_log_dir);
if (env->i->real_tmp_dir) if (env->i->real_tmp_dir)
toku_free(env->i->real_tmp_dir); toku_free(env->i->real_tmp_dir);
if (env->i->open_dbs) if (env->i->open_dbs_by_dname)
toku_omt_destroy(&env->i->open_dbs); toku_omt_destroy(&env->i->open_dbs_by_dname);
if (env->i->open_dbs_by_dict_id)
toku_omt_destroy(&env->i->open_dbs_by_dict_id);
if (env->i->dir) if (env->i->dir)
toku_free(env->i->dir); toku_free(env->i->dir);
toku_mutex_destroy(&env->i->open_dbs_lock); toku_pthread_rwlock_destroy(&env->i->open_dbs_rwlock);
// Immediately before freeing internal environment unlock the directories // Immediately before freeing internal environment unlock the directories
unlock_single_process(env); unlock_single_process(env);
...@@ -1717,6 +1719,12 @@ env_set_lock_timeout(DB_ENV *env, uint64_t lock_timeout_msec) { ...@@ -1717,6 +1719,12 @@ env_set_lock_timeout(DB_ENV *env, uint64_t lock_timeout_msec) {
return 0; return 0;
} }
static int
env_set_lock_timeout_callback(DB_ENV *env, lock_timeout_callback callback) {
env->i->lock_wait_timeout_callback = callback;
return 0;
}
static void static void
format_time(const time_t *timer, char *buf) { format_time(const time_t *timer, char *buf) {
ctime_r(timer, buf); ctime_r(timer, buf);
...@@ -2244,6 +2252,169 @@ env_get_cursor_for_directory(DB_ENV* env, DB_TXN* txn, DBC** c) { ...@@ -2244,6 +2252,169 @@ env_get_cursor_for_directory(DB_ENV* env, DB_TXN* txn, DBC** c) {
return toku_db_cursor(env->i->directory, txn, c, 0); return toku_db_cursor(env->i->directory, txn, c, 0);
} }
struct ltm_iterate_requests_callback_extra {
ltm_iterate_requests_callback_extra(DB_ENV *e,
iterate_requests_callback cb,
void *ex) :
env(e), callback(cb), extra(ex) {
}
DB_ENV *env;
iterate_requests_callback callback;
void *extra;
};
static int
find_db_by_dict_id(OMTVALUE v, void *dict_id_v) {
DB *db = (DB *) v;
DICTIONARY_ID dict_id = db->i->dict_id;
DICTIONARY_ID dict_id_find = *(DICTIONARY_ID *) dict_id_v;
if (dict_id.dictid < dict_id_find.dictid) {
return -1;
} else if (dict_id.dictid > dict_id_find.dictid) {
return 1;
} else {
return 0;
}
}
static DB *
locked_get_db_by_dict_id(DB_ENV *env, DICTIONARY_ID dict_id) {
OMTVALUE dbv;
int r = toku_omt_find_zero(env->i->open_dbs_by_dict_id, find_db_by_dict_id,
(void *) &dict_id, &dbv, nullptr);
return r == 0 ? (DB *) dbv : nullptr;
}
static int ltm_iterate_requests_callback(DICTIONARY_ID dict_id, TXNID txnid,
const DBT *left_key,
const DBT *right_key,
TXNID blocking_txnid,
uint64_t start_time,
void *extra) {
ltm_iterate_requests_callback_extra *info =
reinterpret_cast<ltm_iterate_requests_callback_extra *>(extra);
toku_pthread_rwlock_rdlock(&info->env->i->open_dbs_rwlock);
int r = 0;
DB *db = locked_get_db_by_dict_id(info->env, dict_id);
if (db != nullptr) {
r = info->callback(db, txnid, left_key, right_key,
blocking_txnid, start_time, info->extra);
}
toku_pthread_rwlock_rdunlock(&info->env->i->open_dbs_rwlock);
return r;
}
static int
env_iterate_pending_lock_requests(DB_ENV *env,
iterate_requests_callback callback,
void *extra) {
if (!env_opened(env)) {
return EINVAL;
}
toku::locktree::manager *mgr = &env->i->ltm;
ltm_iterate_requests_callback_extra e(env, callback, extra);
return mgr->iterate_pending_lock_requests(ltm_iterate_requests_callback, &e);
}
// for the lifetime of this object:
// - open_dbs_rwlock must be read locked (or better)
// - txn_mutex must be held
struct iter_txn_row_locks_callback_extra {
iter_txn_row_locks_callback_extra(DB_ENV *e, toku::omt<txn_lt_key_ranges> *m) :
env(e), current_db(nullptr), which_lt(0), lt_map(m) {
if (lt_map->size() > 0) {
set_iterator_and_current_db();
}
}
void set_iterator_and_current_db() {
txn_lt_key_ranges ranges;
const int r = lt_map->fetch(which_lt, &ranges);
invariant_zero(r);
current_db = locked_get_db_by_dict_id(env, ranges.lt->get_dict_id());
iter.create(ranges.buffer);
}
DB_ENV *env;
DB *current_db;
size_t which_lt;
toku::omt<txn_lt_key_ranges> *lt_map;
toku::range_buffer::iterator iter;
toku::range_buffer::iterator::record rec;
};
static int iter_txn_row_locks_callback(DB **db, DBT *left_key, DBT *right_key, void *extra) {
iter_txn_row_locks_callback_extra *info =
reinterpret_cast<iter_txn_row_locks_callback_extra *>(extra);
while (info->which_lt < info->lt_map->size()) {
const bool more = info->iter.current(&info->rec);
if (more) {
*db = info->current_db;
// The caller should interpret data/size == 0 to mean infinity.
// Therefore, when we copyref pos/neg infinity into left/right_key,
// the caller knows what we're talking about.
toku_copyref_dbt(left_key, *info->rec.get_left_key());
toku_copyref_dbt(right_key, *info->rec.get_right_key());
info->iter.next();
return 0;
} else {
info->which_lt++;
if (info->which_lt < info->lt_map->size()) {
info->set_iterator_and_current_db();
}
}
}
return DB_NOTFOUND;
}
struct iter_txns_callback_extra {
iter_txns_callback_extra(DB_ENV *e, iterate_transactions_callback cb, void *ex) :
env(e), callback(cb), extra(ex) {
}
DB_ENV *env;
iterate_transactions_callback callback;
void *extra;
};
static int iter_txns_callback(TOKUTXN txn, void *extra) {
iter_txns_callback_extra *info =
reinterpret_cast<iter_txns_callback_extra *>(extra);
DB_TXN *dbtxn = toku_txn_get_container_db_txn(txn);
invariant_notnull(dbtxn);
toku_mutex_lock(&db_txn_struct_i(dbtxn)->txn_mutex);
toku_pthread_rwlock_rdlock(&info->env->i->open_dbs_rwlock);
iter_txn_row_locks_callback_extra e(info->env, &db_txn_struct_i(dbtxn)->lt_map);
const int r = info->callback(toku_txn_get_txnid(txn).parent_id64,
toku_txn_get_client_id(txn),
iter_txn_row_locks_callback,
&e,
info->extra);
toku_pthread_rwlock_rdunlock(&info->env->i->open_dbs_rwlock);
toku_mutex_unlock(&db_txn_struct_i(dbtxn)->txn_mutex);
return r;
}
static int
env_iterate_live_transactions(DB_ENV *env,
iterate_transactions_callback callback,
void *extra) {
if (!env_opened(env)) {
return EINVAL;
}
TXN_MANAGER txn_manager = toku_logger_get_txn_manager(env->i->logger);
iter_txns_callback_extra e(env, callback, extra);
return toku_txn_manager_iter_over_live_root_txns(txn_manager, iter_txns_callback, &e);
}
static int static int
toku_env_create(DB_ENV ** envp, uint32_t flags) { toku_env_create(DB_ENV ** envp, uint32_t flags) {
int r = ENOSYS; int r = ENOSYS;
...@@ -2307,12 +2478,15 @@ toku_env_create(DB_ENV ** envp, uint32_t flags) { ...@@ -2307,12 +2478,15 @@ toku_env_create(DB_ENV ** envp, uint32_t flags) {
USENV(txn_stat); USENV(txn_stat);
USENV(get_lock_timeout); USENV(get_lock_timeout);
USENV(set_lock_timeout); USENV(set_lock_timeout);
USENV(set_lock_timeout_callback);
USENV(set_redzone); USENV(set_redzone);
USENV(log_flush); USENV(log_flush);
USENV(log_archive); USENV(log_archive);
USENV(create_loader); USENV(create_loader);
USENV(get_cursor_for_persistent_environment); USENV(get_cursor_for_persistent_environment);
USENV(get_cursor_for_directory); USENV(get_cursor_for_directory);
USENV(iterate_pending_lock_requests);
USENV(iterate_live_transactions);
USENV(change_fsync_log_period); USENV(change_fsync_log_period);
#undef USENV #undef USENV
...@@ -2350,10 +2524,11 @@ toku_env_create(DB_ENV ** envp, uint32_t flags) { ...@@ -2350,10 +2524,11 @@ toku_env_create(DB_ENV ** envp, uint32_t flags) {
// The escalate callback will need it to translate txnids to DB_TXNs // The escalate callback will need it to translate txnids to DB_TXNs
result->i->ltm.create(toku_db_lt_on_create_callback, toku_db_lt_on_destroy_callback, toku_db_txn_escalate_callback, result); result->i->ltm.create(toku_db_lt_on_create_callback, toku_db_lt_on_destroy_callback, toku_db_txn_escalate_callback, result);
r = toku_omt_create(&result->i->open_dbs); r = toku_omt_create(&result->i->open_dbs_by_dname);
toku_mutex_init(&result->i->open_dbs_lock, NULL);
assert_zero(r); assert_zero(r);
assert(result->i->open_dbs); r = toku_omt_create(&result->i->open_dbs_by_dict_id);
assert_zero(r);
toku_pthread_rwlock_init(&result->i->open_dbs_rwlock, NULL);
*envp = result; *envp = result;
r = 0; r = 0;
...@@ -2378,7 +2553,7 @@ DB_ENV_CREATE_FUN (DB_ENV ** envp, uint32_t flags) { ...@@ -2378,7 +2553,7 @@ DB_ENV_CREATE_FUN (DB_ENV ** envp, uint32_t flags) {
// return <0 if v is earlier in omt than dbv // return <0 if v is earlier in omt than dbv
// return >0 if v is later in omt than dbv // return >0 if v is later in omt than dbv
static int static int
find_db_by_db (OMTVALUE v, void *dbv) { find_db_by_db_dname(OMTVALUE v, void *dbv) {
DB *db = (DB *) v; // DB* that is stored in the omt DB *db = (DB *) v; // DB* that is stored in the omt
DB *dbfind = (DB *) dbv; // extra, to be compared to v DB *dbfind = (DB *) dbv; // extra, to be compared to v
int cmp; int cmp;
...@@ -2391,42 +2566,78 @@ find_db_by_db (OMTVALUE v, void *dbv) { ...@@ -2391,42 +2566,78 @@ find_db_by_db (OMTVALUE v, void *dbv) {
return 0; return 0;
} }
static int
find_db_by_db_dict_id(OMTVALUE v, void *dbv) {
DB *db = (DB *) v;
DB *dbfind = (DB *) dbv;
DICTIONARY_ID dict_id = db->i->dict_id;
DICTIONARY_ID dict_id_find = dbfind->i->dict_id;
if (dict_id.dictid < dict_id_find.dictid) {
return -1;
} else if (dict_id.dictid > dict_id_find.dictid) {
return 1;
} else if (db < dbfind) {
return -1;
} else if (db > dbfind) {
return 1;
} else {
return 0;
}
}
// Tell env that there is a new db handle (with non-unique dname in db->i-dname) // Tell env that there is a new db handle (with non-unique dname in db->i-dname)
void void
env_note_db_opened(DB_ENV *env, DB *db) { env_note_db_opened(DB_ENV *env, DB *db) {
toku_mutex_lock(&env->i->open_dbs_lock); toku_pthread_rwlock_wrlock(&env->i->open_dbs_rwlock);
assert(db->i->dname); // internal (non-user) dictionary has no dname assert(db->i->dname); // internal (non-user) dictionary has no dname
int r; int r;
OMTVALUE dbv; OMTVALUE v;
uint32_t idx; uint32_t idx;
STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS) = toku_omt_size(env->i->open_dbs); r = toku_omt_find_zero(env->i->open_dbs_by_dname, find_db_by_db_dname,
db, &v, &idx);
assert(r == DB_NOTFOUND);
r = toku_omt_insert_at(env->i->open_dbs_by_dname, db, idx);
assert_zero(r);
r = toku_omt_find_zero(env->i->open_dbs_by_dict_id, find_db_by_db_dict_id,
db, &v, &idx);
assert(r == DB_NOTFOUND);
r = toku_omt_insert_at(env->i->open_dbs_by_dict_id, db, idx);
assert_zero(r);
STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS) = toku_omt_size(env->i->open_dbs_by_dname);
STATUS_VALUE(YDB_LAYER_NUM_DB_OPEN)++; STATUS_VALUE(YDB_LAYER_NUM_DB_OPEN)++;
if (STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS) > STATUS_VALUE(YDB_LAYER_MAX_OPEN_DBS)) if (STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS) > STATUS_VALUE(YDB_LAYER_MAX_OPEN_DBS)) {
STATUS_VALUE(YDB_LAYER_MAX_OPEN_DBS) = STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS); STATUS_VALUE(YDB_LAYER_MAX_OPEN_DBS) = STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS);
r = toku_omt_find_zero(env->i->open_dbs, find_db_by_db, db, &dbv, &idx); }
assert(r==DB_NOTFOUND); //Must not already be there. toku_pthread_rwlock_wrunlock(&env->i->open_dbs_rwlock);
r = toku_omt_insert_at(env->i->open_dbs, db, idx);
assert_zero(r);
toku_mutex_unlock(&env->i->open_dbs_lock);
} }
// Effect: Tell the DB_ENV that the DB is no longer in use by the user of the API. The DB may still be in use by the fractal tree internals. // Effect: Tell the DB_ENV that the DB is no longer in use by the user of the API. The DB may still be in use by the fractal tree internals.
void void
env_note_db_closed(DB_ENV *env, DB *db) { env_note_db_closed(DB_ENV *env, DB *db) {
toku_mutex_lock(&env->i->open_dbs_lock); toku_pthread_rwlock_wrlock(&env->i->open_dbs_rwlock);
assert(db->i->dname); // internal (non-user) dictionary has no dname assert(db->i->dname); // internal (non-user) dictionary has no dname
assert(toku_omt_size(env->i->open_dbs) > 0); assert(toku_omt_size(env->i->open_dbs_by_dname) > 0);
assert(toku_omt_size(env->i->open_dbs_by_dict_id) > 0);
int r; int r;
OMTVALUE dbv; OMTVALUE v;
uint32_t idx; uint32_t idx;
STATUS_VALUE(YDB_LAYER_NUM_DB_CLOSE)++; r = toku_omt_find_zero(env->i->open_dbs_by_dname, find_db_by_db_dname,
r = toku_omt_find_zero(env->i->open_dbs, find_db_by_db, db, &dbv, &idx); db, &v, &idx);
assert_zero(r); //Must already be there. assert_zero(r);
assert((DB*)dbv == db); r = toku_omt_delete_at(env->i->open_dbs_by_dname, idx);
r = toku_omt_delete_at(env->i->open_dbs, idx);
STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS) = toku_omt_size(env->i->open_dbs);
assert_zero(r); assert_zero(r);
toku_mutex_unlock(&env->i->open_dbs_lock); r = toku_omt_find_zero(env->i->open_dbs_by_dict_id, find_db_by_db_dict_id,
db, &v, &idx);
assert_zero(r);
r = toku_omt_delete_at(env->i->open_dbs_by_dict_id, idx);
assert_zero(r);
STATUS_VALUE(YDB_LAYER_NUM_DB_CLOSE)++;
STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS) = toku_omt_size(env->i->open_dbs_by_dname);
toku_pthread_rwlock_wrunlock(&env->i->open_dbs_rwlock);
} }
static int static int
...@@ -2446,8 +2657,8 @@ env_is_db_with_dname_open(DB_ENV *env, const char *dname) { ...@@ -2446,8 +2657,8 @@ env_is_db_with_dname_open(DB_ENV *env, const char *dname) {
bool rval; bool rval;
OMTVALUE dbv; OMTVALUE dbv;
uint32_t idx; uint32_t idx;
toku_mutex_lock(&env->i->open_dbs_lock); toku_pthread_rwlock_rdlock(&env->i->open_dbs_rwlock);
r = toku_omt_find_zero(env->i->open_dbs, find_open_db_by_dname, (void*)dname, &dbv, &idx); r = toku_omt_find_zero(env->i->open_dbs_by_dname, find_open_db_by_dname, (void*)dname, &dbv, &idx);
if (r==0) { if (r==0) {
DB *db = (DB *) dbv; DB *db = (DB *) dbv;
assert(strcmp(dname, db->i->dname) == 0); assert(strcmp(dname, db->i->dname) == 0);
...@@ -2457,7 +2668,7 @@ env_is_db_with_dname_open(DB_ENV *env, const char *dname) { ...@@ -2457,7 +2668,7 @@ env_is_db_with_dname_open(DB_ENV *env, const char *dname) {
assert(r==DB_NOTFOUND); assert(r==DB_NOTFOUND);
rval = false; rval = false;
} }
toku_mutex_unlock(&env->i->open_dbs_lock); toku_pthread_rwlock_rdunlock(&env->i->open_dbs_rwlock);
return rval; return rval;
} }
......
...@@ -725,6 +725,17 @@ toku_db_stat64(DB * db, DB_TXN *txn, DB_BTREE_STAT64 *s) { ...@@ -725,6 +725,17 @@ toku_db_stat64(DB * db, DB_TXN *txn, DB_BTREE_STAT64 *s) {
return 0; return 0;
} }
static const char *
toku_db_get_dname(DB *db) {
if (!db_opened(db)) {
return nullptr;
}
if (db->i->dname == nullptr) {
return "";
}
return db->i->dname;
}
static int static int
toku_db_keys_range64(DB* db, DB_TXN* txn __attribute__((__unused__)), DBT* keyleft, DBT* keyright, uint64_t* less, uint64_t* left, uint64_t* between, uint64_t *right, uint64_t *greater, bool* middle_3_exact) { toku_db_keys_range64(DB* db, DB_TXN* txn __attribute__((__unused__)), DBT* keyleft, DBT* keyright, uint64_t* less, uint64_t* left, uint64_t* between, uint64_t *right, uint64_t *greater, bool* middle_3_exact) {
HANDLE_PANICKED_DB(db); HANDLE_PANICKED_DB(db);
...@@ -1036,6 +1047,7 @@ toku_db_create(DB ** db, DB_ENV * env, uint32_t flags) { ...@@ -1036,6 +1047,7 @@ toku_db_create(DB ** db, DB_ENV * env, uint32_t flags) {
USDB(stat64); USDB(stat64);
USDB(get_fractal_tree_info64); USDB(get_fractal_tree_info64);
USDB(iterate_fractal_tree_block_map); USDB(iterate_fractal_tree_block_map);
USDB(get_dname);
USDB(verify_with_progress); USDB(verify_with_progress);
USDB(cursor); USDB(cursor);
USDB(dbt_pos_infty); USDB(dbt_pos_infty);
......
...@@ -250,9 +250,15 @@ int toku_db_start_range_lock(DB *db, DB_TXN *txn, const DBT *left_key, const DBT ...@@ -250,9 +250,15 @@ int toku_db_start_range_lock(DB *db, DB_TXN *txn, const DBT *left_key, const DBT
TXNID txn_anc_id = txn_anc->id64(txn_anc); TXNID txn_anc_id = txn_anc->id64(txn_anc);
request->set(db->i->lt, txn_anc_id, left_key, right_key, lock_type); request->set(db->i->lt, txn_anc_id, left_key, right_key, lock_type);
int r = request->start(); const int r = request->start();
if (r == 0) { if (r == 0) {
db_txn_note_row_lock(db, txn_anc, left_key, right_key); db_txn_note_row_lock(db, txn_anc, left_key, right_key);
} else if (r == DB_LOCK_DEADLOCK) {
lock_timeout_callback callback = txn->mgrp->i->lock_wait_timeout_callback;
if (callback != nullptr) {
callback(db, txn_anc_id, left_key, right_key,
request->get_conflicting_txnid());
}
} }
return r; return r;
} }
...@@ -260,12 +266,19 @@ int toku_db_start_range_lock(DB *db, DB_TXN *txn, const DBT *left_key, const DBT ...@@ -260,12 +266,19 @@ int toku_db_start_range_lock(DB *db, DB_TXN *txn, const DBT *left_key, const DBT
// Complete a lock request by waiting until the request is ready // Complete a lock request by waiting until the request is ready
// and then storing the acquired lock if successful. // and then storing the acquired lock if successful.
int toku_db_wait_range_lock(DB *db, DB_TXN *txn, toku::lock_request *request) { int toku_db_wait_range_lock(DB *db, DB_TXN *txn, toku::lock_request *request) {
int r = request->wait();
if (r == 0) {
DB_TXN *txn_anc = txn_oldest_ancester(txn); DB_TXN *txn_anc = txn_oldest_ancester(txn);
const DBT *left_key = request->get_left_key(); const DBT *left_key = request->get_left_key();
const DBT *right_key = request->get_right_key(); const DBT *right_key = request->get_right_key();
const int r = request->wait();
if (r == 0) {
db_txn_note_row_lock(db, txn_anc, left_key, right_key); db_txn_note_row_lock(db, txn_anc, left_key, right_key);
} else if (r == DB_LOCK_NOTGRANTED) {
lock_timeout_callback callback = txn->mgrp->i->lock_wait_timeout_callback;
if (callback != nullptr) {
callback(db, txn_anc->id64(txn_anc), left_key, right_key,
request->get_conflicting_txnid());
}
} }
return r; return r;
} }
......
...@@ -101,14 +101,12 @@ PATENT RIGHTS GRANT: ...@@ -101,14 +101,12 @@ PATENT RIGHTS GRANT:
#include "ydb_txn.h" #include "ydb_txn.h"
#include "ydb_row_lock.h" #include "ydb_row_lock.h"
static uint64_t static uint64_t toku_txn_id64(DB_TXN * txn) {
toku_txn_id64(DB_TXN * txn) {
HANDLE_PANICKED_ENV(txn->mgrp); HANDLE_PANICKED_ENV(txn->mgrp);
return toku_txn_get_root_id(db_txn_struct_i(txn)->tokutxn); return toku_txn_get_root_id(db_txn_struct_i(txn)->tokutxn);
} }
static void static void toku_txn_release_locks(DB_TXN *txn) {
toku_txn_release_locks(DB_TXN *txn) {
// Prevent access to the locktree map while releasing. // Prevent access to the locktree map while releasing.
// It is possible for lock escalation to attempt to // It is possible for lock escalation to attempt to
// modify this data structure while the txn commits. // modify this data structure while the txn commits.
...@@ -125,16 +123,14 @@ toku_txn_release_locks(DB_TXN *txn) { ...@@ -125,16 +123,14 @@ toku_txn_release_locks(DB_TXN *txn) {
toku_mutex_unlock(&db_txn_struct_i(txn)->txn_mutex); toku_mutex_unlock(&db_txn_struct_i(txn)->txn_mutex);
} }
static void static void toku_txn_destroy(DB_TXN *txn) {
toku_txn_destroy(DB_TXN *txn) {
db_txn_struct_i(txn)->lt_map.destroy(); db_txn_struct_i(txn)->lt_map.destroy();
toku_txn_destroy_txn(db_txn_struct_i(txn)->tokutxn); toku_txn_destroy_txn(db_txn_struct_i(txn)->tokutxn);
toku_mutex_destroy(&db_txn_struct_i(txn)->txn_mutex); toku_mutex_destroy(&db_txn_struct_i(txn)->txn_mutex);
toku_free(txn); toku_free(txn);
} }
static int static int toku_txn_commit(DB_TXN * txn, uint32_t flags,
toku_txn_commit(DB_TXN * txn, uint32_t flags,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra, TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra,
bool release_mo_lock, bool low_priority) { bool release_mo_lock, bool low_priority) {
HANDLE_PANICKED_ENV(txn->mgrp); HANDLE_PANICKED_ENV(txn->mgrp);
...@@ -208,15 +204,13 @@ cleanup: ...@@ -208,15 +204,13 @@ cleanup:
return r; return r;
} }
static uint32_t static uint32_t toku_txn_id(DB_TXN * txn) {
toku_txn_id(DB_TXN * txn) {
HANDLE_PANICKED_ENV(txn->mgrp); HANDLE_PANICKED_ENV(txn->mgrp);
abort(); abort();
return (uint32_t) -1; return (uint32_t) -1;
} }
static int static int toku_txn_abort(DB_TXN * txn,
toku_txn_abort(DB_TXN * txn,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra) { TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra) {
HANDLE_PANICKED_ENV(txn->mgrp); HANDLE_PANICKED_ENV(txn->mgrp);
//Recursively kill off children (abort or commit are both correct, commit is cheaper) //Recursively kill off children (abort or commit are both correct, commit is cheaper)
...@@ -248,8 +242,7 @@ toku_txn_abort(DB_TXN * txn, ...@@ -248,8 +242,7 @@ toku_txn_abort(DB_TXN * txn,
return r; return r;
} }
static int static int toku_txn_xa_prepare (DB_TXN *txn, TOKU_XA_XID *xid) {
toku_txn_xa_prepare (DB_TXN *txn, TOKU_XA_XID *xid) {
int r = 0; int r = 0;
if (!txn) { if (!txn) {
r = EINVAL; r = EINVAL;
...@@ -301,8 +294,7 @@ exit: ...@@ -301,8 +294,7 @@ exit:
// requires: must hold the multi operation lock. it is // requires: must hold the multi operation lock. it is
// released in toku_txn_xa_prepare before the fsync. // released in toku_txn_xa_prepare before the fsync.
static int static int toku_txn_prepare (DB_TXN *txn, uint8_t gid[DB_GID_SIZE]) {
toku_txn_prepare (DB_TXN *txn, uint8_t gid[DB_GID_SIZE]) {
TOKU_XA_XID xid; TOKU_XA_XID xid;
TOKU_ANNOTATE_NEW_MEMORY(&xid, sizeof(xid)); TOKU_ANNOTATE_NEW_MEMORY(&xid, sizeof(xid));
xid.formatID=0x756b6f54; // "Toku" xid.formatID=0x756b6f54; // "Toku"
...@@ -312,20 +304,17 @@ toku_txn_prepare (DB_TXN *txn, uint8_t gid[DB_GID_SIZE]) { ...@@ -312,20 +304,17 @@ toku_txn_prepare (DB_TXN *txn, uint8_t gid[DB_GID_SIZE]) {
return toku_txn_xa_prepare(txn, &xid); return toku_txn_xa_prepare(txn, &xid);
} }
static int static int toku_txn_txn_stat (DB_TXN *txn, struct txn_stat **txn_stat) {
toku_txn_txn_stat (DB_TXN *txn, struct txn_stat **txn_stat) {
XMALLOC(*txn_stat); XMALLOC(*txn_stat);
return toku_logger_txn_rollback_stats(db_txn_struct_i(txn)->tokutxn, *txn_stat); return toku_logger_txn_rollback_stats(db_txn_struct_i(txn)->tokutxn, *txn_stat);
} }
static int static int locked_txn_txn_stat (DB_TXN *txn, struct txn_stat **txn_stat) {
locked_txn_txn_stat (DB_TXN *txn, struct txn_stat **txn_stat) {
int r = toku_txn_txn_stat(txn, txn_stat); int r = toku_txn_txn_stat(txn, txn_stat);
return r; return r;
} }
static int static int locked_txn_commit_with_progress(DB_TXN *txn, uint32_t flags,
locked_txn_commit_with_progress(DB_TXN *txn, uint32_t flags,
TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra) { TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra) {
bool holds_mo_lock = false; bool holds_mo_lock = false;
bool low_priority = false; bool low_priority = false;
...@@ -349,8 +338,7 @@ locked_txn_commit_with_progress(DB_TXN *txn, uint32_t flags, ...@@ -349,8 +338,7 @@ locked_txn_commit_with_progress(DB_TXN *txn, uint32_t flags,
return r; return r;
} }
static int static int locked_txn_abort_with_progress(DB_TXN *txn,
locked_txn_abort_with_progress(DB_TXN *txn,
TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra) { TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra) {
// cannot begin a checkpoint // cannot begin a checkpoint
// the multi operation lock is taken the first time we // the multi operation lock is taken the first time we
...@@ -380,26 +368,33 @@ locked_txn_abort_with_progress(DB_TXN *txn, ...@@ -380,26 +368,33 @@ locked_txn_abort_with_progress(DB_TXN *txn,
return r; return r;
} }
int int locked_txn_commit(DB_TXN *txn, uint32_t flags) {
locked_txn_commit(DB_TXN *txn, uint32_t flags) {
int r = locked_txn_commit_with_progress(txn, flags, NULL, NULL); int r = locked_txn_commit_with_progress(txn, flags, NULL, NULL);
return r; return r;
} }
int int locked_txn_abort(DB_TXN *txn) {
locked_txn_abort(DB_TXN *txn) {
int r = locked_txn_abort_with_progress(txn, NULL, NULL); int r = locked_txn_abort_with_progress(txn, NULL, NULL);
return r; return r;
} }
static inline void static void locked_txn_set_client_id(DB_TXN *txn, uint64_t client_id) {
txn_func_init(DB_TXN *txn) { toku_txn_set_client_id(db_txn_struct_i(txn)->tokutxn, client_id);
}
static uint64_t locked_txn_get_client_id(DB_TXN *txn) {
return toku_txn_get_client_id(db_txn_struct_i(txn)->tokutxn);
}
static inline void txn_func_init(DB_TXN *txn) {
#define STXN(name) txn->name = locked_txn_ ## name #define STXN(name) txn->name = locked_txn_ ## name
STXN(abort); STXN(abort);
STXN(commit); STXN(commit);
STXN(abort_with_progress); STXN(abort_with_progress);
STXN(commit_with_progress); STXN(commit_with_progress);
STXN(txn_stat); STXN(txn_stat);
STXN(set_client_id);
STXN(get_client_id);
#undef STXN #undef STXN
#define SUTXN(name) txn->name = toku_txn_ ## name #define SUTXN(name) txn->name = toku_txn_ ## name
SUTXN(prepare); SUTXN(prepare);
...@@ -409,7 +404,6 @@ txn_func_init(DB_TXN *txn) { ...@@ -409,7 +404,6 @@ txn_func_init(DB_TXN *txn) {
txn->id64 = toku_txn_id64; txn->id64 = toku_txn_id64;
} }
// //
// Creates a transaction for the user // Creates a transaction for the user
// In our system, as far as the user is concerned, the rules are as follows: // In our system, as far as the user is concerned, the rules are as follows:
...@@ -422,8 +416,7 @@ txn_func_init(DB_TXN *txn) { ...@@ -422,8 +416,7 @@ txn_func_init(DB_TXN *txn) {
// - if a parent transaction is committed/aborted, the child transactions are recursively // - if a parent transaction is committed/aborted, the child transactions are recursively
// committed // committed
// //
int int toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, uint32_t flags) {
toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, uint32_t flags) {
HANDLE_PANICKED_ENV(env); HANDLE_PANICKED_ENV(env);
HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, stxn); //Cannot create child while child already exists. HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, stxn); //Cannot create child while child already exists.
if (!toku_logger_is_open(env->i->logger)) if (!toku_logger_is_open(env->i->logger))
...@@ -543,6 +536,8 @@ toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, uint32_t flags) { ...@@ -543,6 +536,8 @@ toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, uint32_t flags) {
db_txn_struct_i(result)->iso = child_isolation; db_txn_struct_i(result)->iso = child_isolation;
db_txn_struct_i(result)->lt_map.create_no_array(); db_txn_struct_i(result)->lt_map.create_no_array();
toku_mutex_init(&db_txn_struct_i(result)->txn_mutex, NULL);
TXN_SNAPSHOT_TYPE snapshot_type; TXN_SNAPSHOT_TYPE snapshot_type;
switch(db_txn_struct_i(result)->iso){ switch(db_txn_struct_i(result)->iso){
case(TOKU_ISO_SNAPSHOT): case(TOKU_ISO_SNAPSHOT):
...@@ -582,8 +577,6 @@ toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, uint32_t flags) { ...@@ -582,8 +577,6 @@ toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, uint32_t flags) {
db_txn_struct_i(result->parent)->child = result; db_txn_struct_i(result->parent)->child = result;
} }
toku_mutex_init(&db_txn_struct_i(result)->txn_mutex, NULL);
*txn = result; *txn = result;
return 0; return 0;
} }
...@@ -605,7 +598,6 @@ void toku_keep_prepared_txn_callback (DB_ENV *env, TOKUTXN tokutxn) { ...@@ -605,7 +598,6 @@ void toku_keep_prepared_txn_callback (DB_ENV *env, TOKUTXN tokutxn) {
} }
// Test-only function // Test-only function
void void toku_increase_last_xid(DB_ENV *env, uint64_t increment) {
toku_increase_last_xid(DB_ENV *env, uint64_t increment) {
toku_txn_manager_increase_last_xid(toku_logger_get_txn_manager(env->i->logger), increment); toku_txn_manager_increase_last_xid(toku_logger_get_txn_manager(env->i->logger), increment);
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment