Commit 71cdee0e authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

#3529 merge serializable iso bug fix to mainline refs[t:3529]

git-svn-id: file:///svn/toku/tokudb@38569 c7de825b-a66e-492c-adef-691d508d4ae1
parent 610ec250
...@@ -4016,8 +4016,8 @@ brt_cursor_cleanup_dbts(BRT_CURSOR c) { ...@@ -4016,8 +4016,8 @@ brt_cursor_cleanup_dbts(BRT_CURSOR c) {
// For the above to NOT be true: // For the above to NOT be true:
// - id > context->snapshot_txnid64 OR id is in context's live root transaction list // - id > context->snapshot_txnid64 OR id is in context's live root transaction list
// //
static static int
int does_txn_read_entry(TXNID id, TOKUTXN context) { does_txn_read_entry(TXNID id, TOKUTXN context) {
int rval; int rval;
TXNID oldest_live_in_snapshot = toku_get_oldest_in_live_root_txn_list(context); TXNID oldest_live_in_snapshot = toku_get_oldest_in_live_root_txn_list(context);
if (id < oldest_live_in_snapshot || id == context->ancestor_txnid64) { if (id < oldest_live_in_snapshot || id == context->ancestor_txnid64) {
...@@ -4032,13 +4032,13 @@ int does_txn_read_entry(TXNID id, TOKUTXN context) { ...@@ -4032,13 +4032,13 @@ int does_txn_read_entry(TXNID id, TOKUTXN context) {
return rval; return rval;
} }
static inline void brt_cursor_extract_key_and_val( static inline void
LEAFENTRY le, brt_cursor_extract_key_and_val(LEAFENTRY le,
BRT_CURSOR cursor, BRT_CURSOR cursor,
u_int32_t *keylen, u_int32_t *keylen,
void **key, void **key,
u_int32_t *vallen, u_int32_t *vallen,
void **val) { void **val) {
if (toku_brt_cursor_is_leaf_mode(cursor)) { if (toku_brt_cursor_is_leaf_mode(cursor)) {
*key = le_key_and_len(le, keylen); *key = le_key_and_len(le, keylen);
*val = le; *val = le;
...@@ -4596,7 +4596,6 @@ maybe_apply_ancestors_messages_to_node (BRT t, BRTNODE node, ANCESTORS ancestors ...@@ -4596,7 +4596,6 @@ maybe_apply_ancestors_messages_to_node (BRT t, BRTNODE node, ANCESTORS ancestors
VERIFY_NODE(t, node); VERIFY_NODE(t, node);
} }
static int static int
brt_cursor_shortcut ( brt_cursor_shortcut (
BRT_CURSOR cursor, BRT_CURSOR cursor,
...@@ -4609,7 +4608,6 @@ brt_cursor_shortcut ( ...@@ -4609,7 +4608,6 @@ brt_cursor_shortcut (
void **val void **val
); );
// This is a bottom layer of the search functions. // This is a bottom layer of the search functions.
static int static int
brt_search_basement_node( brt_search_basement_node(
...@@ -4650,10 +4648,12 @@ brt_search_basement_node( ...@@ -4650,10 +4648,12 @@ brt_search_basement_node(
switch (search->direction) { switch (search->direction) {
case BRT_SEARCH_LEFT: case BRT_SEARCH_LEFT:
idx++; idx++;
if (idx>=toku_omt_size(bn->buffer)) return DB_NOTFOUND; if (idx >= toku_omt_size(bn->buffer))
return DB_NOTFOUND;
break; break;
case BRT_SEARCH_RIGHT: case BRT_SEARCH_RIGHT:
if (idx==0) return DB_NOTFOUND; if (idx == 0)
return DB_NOTFOUND;
idx--; idx--;
break; break;
default: default:
...@@ -4680,7 +4680,7 @@ brt_search_basement_node( ...@@ -4680,7 +4680,7 @@ brt_search_basement_node(
&val &val
); );
r = getf(keylen, key, vallen, val, getf_v); r = getf(keylen, key, vallen, val, getf_v, false);
if (r==0 || r == TOKUDB_CURSOR_CONTINUE) { if (r==0 || r == TOKUDB_CURSOR_CONTINUE) {
brtcursor->leaf_info.to_be.omt = bn->buffer; brtcursor->leaf_info.to_be.omt = bn->buffer;
brtcursor->leaf_info.to_be.index = idx; brtcursor->leaf_info.to_be.index = idx;
...@@ -4905,16 +4905,12 @@ static void ...@@ -4905,16 +4905,12 @@ static void
maybe_search_save_bound( maybe_search_save_bound(
BRTNODE node, BRTNODE node,
int child_searched, int child_searched,
brt_search_t *search brt_search_t *search)
)
{ {
DBT pivotkey;
toku_init_dbt(&pivotkey);
int p = (search->direction == BRT_SEARCH_LEFT) ? child_searched : child_searched - 1; int p = (search->direction == BRT_SEARCH_LEFT) ? child_searched : child_searched - 1;
if (p >=0 && p < node->n_children-1) { if (p >= 0 && p < node->n_children-1) {
struct kv_pair *pivot = node->childkeys[p]; struct kv_pair const * pivot = node->childkeys[p];
toku_fill_dbt(&pivotkey, kv_pair_key(pivot), kv_pair_keylen(pivot)); DBT pivotkey = { .data = kv_pair_key((struct kv_pair *) pivot), .size = kv_pair_keylen(pivot) };
search_save_bound(search, &pivotkey); search_save_bound(search, &pivotkey);
} }
} }
...@@ -4989,18 +4985,27 @@ brt_search_node( ...@@ -4989,18 +4985,27 @@ brt_search_node(
} }
// we have a new pivotkey // we have a new pivotkey
else { else {
if (node->height == 0) {
// when we run off the end of a basement, try to lock the range up to the pivot. solves #3529
struct kv_pair const * pivot = NULL;
if (search->direction == BRT_SEARCH_LEFT)
pivot = next_bounds.upper_bound_inclusive; // left -> right
else
pivot = next_bounds.lower_bound_exclusive; // right -> left
if (pivot) {
int rr = getf(kv_pair_keylen(pivot), kv_pair_key_const(pivot), 0, NULL, getf_v, true);
if (rr != 0)
return rr; // lock was not granted
}
}
// If we got a DB_NOTFOUND then we have to search the next record. Possibly everything present is not visible. // If we got a DB_NOTFOUND then we have to search the next record. Possibly everything present is not visible.
// This way of doing DB_NOTFOUND is a kludge, and ought to be simplified. Something like this is needed for DB_NEXT, but // This way of doing DB_NOTFOUND is a kludge, and ought to be simplified. Something like this is needed for DB_NEXT, but
// for point queries, it's overkill. If we got a DB_NOTFOUND on a point query then we should just stop looking. // for point queries, it's overkill. If we got a DB_NOTFOUND on a point query then we should just stop looking.
// When releasing locks on I/O we must not search the same subtree again, or we won't be guaranteed to make forward progress. // When releasing locks on I/O we must not search the same subtree again, or we won't be guaranteed to make forward progress.
// If we got a DB_NOTFOUND, then the pivot is too small if searching from left to right (too large if searching from right to left). // If we got a DB_NOTFOUND, then the pivot is too small if searching from left to right (too large if searching from right to left).
// So save the pivot key in the search object. // So save the pivot key in the search object.
// printf("%*ssave_bound %s\n", 9-node->height, "", (char*)pivotkey.data); maybe_search_save_bound(node, child_to_search, search);
maybe_search_save_bound(
node,
child_to_search,
search
);
} }
// not really necessary, just put this here so that reading the // not really necessary, just put this here so that reading the
// code becomes simpler. The point is at this point in the code, // code becomes simpler. The point is at this point in the code,
...@@ -5124,7 +5129,7 @@ toku_brt_search (BRT brt, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf, ...@@ -5124,7 +5129,7 @@ toku_brt_search (BRT brt, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf,
//TODO: #1378 This is not the ultimate location of this call to the //TODO: #1378 This is not the ultimate location of this call to the
//callback. It is surely wrong for node-level locking, and probably //callback. It is surely wrong for node-level locking, and probably
//wrong for the STRADDLE callback for heaviside function(two sets of key/vals) //wrong for the STRADDLE callback for heaviside function(two sets of key/vals)
int r2 = getf(0,NULL, 0,NULL, getf_v); int r2 = getf(0,NULL, 0,NULL, getf_v, false);
if (r2!=0) r = r2; if (r2!=0) r = r2;
} }
...@@ -5184,20 +5189,20 @@ static int brt_cursor_compare_set(brt_search_t *search, DBT *x) { ...@@ -5184,20 +5189,20 @@ static int brt_cursor_compare_set(brt_search_t *search, DBT *x) {
static int static int
brt_cursor_current_getf(ITEMLEN keylen, bytevec key, brt_cursor_current_getf(ITEMLEN keylen, bytevec key,
ITEMLEN vallen, bytevec val, ITEMLEN vallen, bytevec val,
void *v) { void *v, bool lock_only) {
struct brt_cursor_search_struct *bcss = v; struct brt_cursor_search_struct *bcss = v;
int r; int r;
if (key==NULL) { if (key==NULL) {
r = bcss->getf(0, NULL, 0, NULL, bcss->getf_v); r = bcss->getf(0, NULL, 0, NULL, bcss->getf_v, lock_only);
} else { } else {
BRT_CURSOR cursor = bcss->cursor; BRT_CURSOR cursor = bcss->cursor;
DBT newkey = {.size=keylen, .data=(void*)key}; // initializes other fields to zero DBT newkey = {.size=keylen, .data=(void*)key}; // initializes other fields to zero
if (compare_k_x(cursor->brt, &cursor->key, &newkey) != 0) { if (compare_k_x(cursor->brt, &cursor->key, &newkey) != 0) {
r = bcss->getf(0, NULL, 0, NULL, bcss->getf_v); // This was once DB_KEYEMPTY r = bcss->getf(0, NULL, 0, NULL, bcss->getf_v, lock_only); // This was once DB_KEYEMPTY
if (r==0) r = TOKUDB_FOUND_BUT_REJECTED; if (r==0) r = TOKUDB_FOUND_BUT_REJECTED;
} }
else else
r = bcss->getf(keylen, key, vallen, val, bcss->getf_v); r = bcss->getf(keylen, key, vallen, val, bcss->getf_v, lock_only);
} }
return r; return r;
} }
...@@ -5214,13 +5219,13 @@ toku_brt_cursor_current(BRT_CURSOR cursor, int op, BRT_GET_CALLBACK_FUNCTION get ...@@ -5214,13 +5219,13 @@ toku_brt_cursor_current(BRT_CURSOR cursor, int op, BRT_GET_CALLBACK_FUNCTION get
brt_search_finish(&search); brt_search_finish(&search);
return r; return r;
} }
return getf(cursor->key.size, cursor->key.data, cursor->val.size, cursor->val.data, getf_v); // brt_cursor_copyout(cursor, outkey, outval); return getf(cursor->key.size, cursor->key.data, cursor->val.size, cursor->val.data, getf_v, false); // brt_cursor_copyout(cursor, outkey, outval);
} }
static int static int
brt_flatten_getf(ITEMLEN UU(keylen), bytevec UU(key), brt_flatten_getf(ITEMLEN UU(keylen), bytevec UU(key),
ITEMLEN UU(vallen), bytevec UU(val), ITEMLEN UU(vallen), bytevec UU(val),
void *UU(v)) { void *UU(v), bool UU(lock_only)) {
return DB_NOTFOUND; return DB_NOTFOUND;
} }
...@@ -5286,8 +5291,8 @@ brt_cursor_shortcut ( ...@@ -5286,8 +5291,8 @@ brt_cursor_shortcut (
u_int32_t limit = (direction > 0) ? (toku_omt_size(omt) - 1) : 0; u_int32_t limit = (direction > 0) ? (toku_omt_size(omt) - 1) : 0;
//Starting with the prev, find the first real (non-provdel) leafentry. //Starting with the prev, find the first real (non-provdel) leafentry.
OMTVALUE le = NULL;
while (index != limit) { while (index != limit) {
OMTVALUE le = NULL;
index += direction; index += direction;
r = toku_omt_fetch(omt, index, &le); r = toku_omt_fetch(omt, index, &le);
assert_zero(r); assert_zero(r);
...@@ -5303,12 +5308,12 @@ brt_cursor_shortcut ( ...@@ -5303,12 +5308,12 @@ brt_cursor_shortcut (
val val
); );
r = getf(*keylen, *key, *vallen, *val, getf_v); r = getf(*keylen, *key, *vallen, *val, getf_v, false);
if (r==0 || r == TOKUDB_CURSOR_CONTINUE) { if (r == 0 || r == TOKUDB_CURSOR_CONTINUE) {
//Update cursor. //Update cursor.
cursor->leaf_info.to_be.index = index; cursor->leaf_info.to_be.index = index;
} }
if (r== TOKUDB_CURSOR_CONTINUE) { if (r == TOKUDB_CURSOR_CONTINUE) {
continue; continue;
} }
else { else {
...@@ -5316,6 +5321,7 @@ brt_cursor_shortcut ( ...@@ -5316,6 +5321,7 @@ brt_cursor_shortcut (
} }
} }
} }
return r; return r;
} }
...@@ -5332,18 +5338,18 @@ toku_brt_cursor_next(BRT_CURSOR cursor, BRT_GET_CALLBACK_FUNCTION getf, void *ge ...@@ -5332,18 +5338,18 @@ toku_brt_cursor_next(BRT_CURSOR cursor, BRT_GET_CALLBACK_FUNCTION getf, void *ge
static int static int
brt_cursor_search_eq_k_x_getf(ITEMLEN keylen, bytevec key, brt_cursor_search_eq_k_x_getf(ITEMLEN keylen, bytevec key,
ITEMLEN vallen, bytevec val, ITEMLEN vallen, bytevec val,
void *v) { void *v, bool lock_only) {
struct brt_cursor_search_struct *bcss = v; struct brt_cursor_search_struct *bcss = v;
int r; int r;
if (key==NULL) { if (key==NULL) {
r = bcss->getf(0, NULL, 0, NULL, bcss->getf_v); r = bcss->getf(0, NULL, 0, NULL, bcss->getf_v, false);
} else { } else {
BRT_CURSOR cursor = bcss->cursor; BRT_CURSOR cursor = bcss->cursor;
DBT newkey = {.size=keylen, .data=(void*)key}; // initializes other fields to zero DBT newkey = {.size=keylen, .data=(void*)key}; // initializes other fields to zero
if (compare_k_x(cursor->brt, bcss->search->k, &newkey) == 0) { if (compare_k_x(cursor->brt, bcss->search->k, &newkey) == 0) {
r = bcss->getf(keylen, key, vallen, val, bcss->getf_v); r = bcss->getf(keylen, key, vallen, val, bcss->getf_v, lock_only);
} else { } else {
r = bcss->getf(0, NULL, 0, NULL, bcss->getf_v); r = bcss->getf(0, NULL, 0, NULL, bcss->getf_v, lock_only);
if (r==0) r = TOKUDB_FOUND_BUT_REJECTED; if (r==0) r = TOKUDB_FOUND_BUT_REJECTED;
} }
} }
...@@ -5507,7 +5513,7 @@ toku_brt_lookup (BRT brt, DBT *k, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v) ...@@ -5507,7 +5513,7 @@ toku_brt_lookup (BRT brt, DBT *k, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v)
/* ********************************* delete **************************************/ /* ********************************* delete **************************************/
static int static int
getf_nothing (ITEMLEN UU(keylen), bytevec UU(key), ITEMLEN UU(vallen), bytevec UU(val), void *UU(pair_v)) { getf_nothing (ITEMLEN UU(keylen), bytevec UU(key), ITEMLEN UU(vallen), bytevec UU(val), void *UU(pair_v), bool UU(lock_only)) {
return 0; return 0;
} }
......
...@@ -24,9 +24,11 @@ C_BEGIN ...@@ -24,9 +24,11 @@ C_BEGIN
// The cursor object will have been updated (so that if result==0 the current value is the value being passed) // The cursor object will have been updated (so that if result==0 the current value is the value being passed)
// (If r!=0 then the cursor won't have been updated.) // (If r!=0 then the cursor won't have been updated.)
// If r!=0, it's up to the callback function to return that value of r. // If r!=0, it's up to the callback function to return that value of r.
//A 'key' bytevec of NULL means that element is not found (effectively infinity or // A 'key' bytevec of NULL means that element is not found (effectively infinity or
//-infinity depending on direction) // -infinity depending on direction)
typedef int(*BRT_GET_CALLBACK_FUNCTION)(ITEMLEN, bytevec, ITEMLEN, bytevec, void*); // When lock_only is false, the callback does optional lock tree locking and then processes the key and val.
// When lock_only is true, the callback only does optional lock tree locking.
typedef int(*BRT_GET_CALLBACK_FUNCTION)(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec val, void *extra, bool lock_only);
int toku_open_brt (const char *fname, int is_create, BRT *, int nodesize, int basementnodesize, CACHETABLE, TOKUTXN, int(*)(DB *,const DBT*,const DBT*), DB*) __attribute__ ((warn_unused_result)); int toku_open_brt (const char *fname, int is_create, BRT *, int nodesize, int basementnodesize, CACHETABLE, TOKUTXN, int(*)(DB *,const DBT*,const DBT*), DB*) __attribute__ ((warn_unused_result));
int toku_brt_change_descriptor(BRT t, const DBT* old_descriptor, const DBT* new_descriptor, BOOL do_log, TOKUTXN txn); int toku_brt_change_descriptor(BRT t, const DBT* old_descriptor, const DBT* new_descriptor, BOOL do_log, TOKUTXN txn);
......
...@@ -62,10 +62,14 @@ struct le_cursor_callback_arg { ...@@ -62,10 +62,14 @@ struct le_cursor_callback_arg {
// copy the key and the leaf entry to the given DBTs // copy the key and the leaf entry to the given DBTs
static int static int
le_cursor_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec val, void *v) { le_cursor_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec val, void *v, bool lock_only) {
struct le_cursor_callback_arg *arg = (struct le_cursor_callback_arg *) v; if (lock_only) {
toku_dbt_set(keylen, key, arg->key, NULL); ; // do nothing
toku_dbt_set(vallen, val, arg->val, NULL); } else {
struct le_cursor_callback_arg *arg = (struct le_cursor_callback_arg *) v;
toku_dbt_set(keylen, key, arg->key, NULL);
toku_dbt_set(vallen, val, arg->val, NULL);
}
return 0; return 0;
} }
......
...@@ -11,7 +11,8 @@ static TOKUTXN const null_txn = 0; ...@@ -11,7 +11,8 @@ static TOKUTXN const null_txn = 0;
static DB * const null_db = 0; static DB * const null_db = 0;
static int static int
save_data (ITEMLEN UU(keylen), bytevec UU(key), ITEMLEN vallen, bytevec val, void *v) { save_data (ITEMLEN UU(keylen), bytevec UU(key), ITEMLEN vallen, bytevec val, void *v, bool lock_only) {
if (lock_only) return 0;
assert(key!=NULL); assert(key!=NULL);
void **vp = v; void **vp = v;
*vp = toku_memdup(val, vallen); *vp = toku_memdup(val, vallen);
......
...@@ -379,10 +379,11 @@ static void test_brt_cursor_rwalk(int n, DB *db) { ...@@ -379,10 +379,11 @@ static void test_brt_cursor_rwalk(int n, DB *db) {
} }
static int static int
ascending_key_string_checkf (ITEMLEN keylen, bytevec key, ITEMLEN UU(vallen), bytevec UU(val), void *v) ascending_key_string_checkf (ITEMLEN keylen, bytevec key, ITEMLEN UU(vallen), bytevec UU(val), void *v, bool lock_only)
// the keys are strings. Verify that they keylen matches the key, that the keys are ascending. Use (char**)v to hold a // the keys are strings. Verify that they keylen matches the key, that the keys are ascending. Use (char**)v to hold a
// malloc'd previous string. // malloc'd previous string.
{ {
if (lock_only) return 0;
if (key!=NULL) { if (key!=NULL) {
assert(keylen == 1+strlen(key)); assert(keylen == 1+strlen(key));
char **prevkeyp = v; char **prevkeyp = v;
......
...@@ -62,20 +62,22 @@ struct check_pair { ...@@ -62,20 +62,22 @@ struct check_pair {
int call_count; int call_count;
}; };
static int static int
lookup_checkf (ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec val, void *pair_v) { lookup_checkf (ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec val, void *pair_v, bool lock_only) {
struct check_pair *pair = (struct check_pair *) pair_v; if (!lock_only) {
if (key!=NULL) { struct check_pair *pair = (struct check_pair *) pair_v;
if (pair->keylen!=len_ignore) { if (key!=NULL) {
assert(pair->keylen == keylen); if (pair->keylen!=len_ignore) {
if (pair->key) assert(pair->keylen == keylen);
assert(memcmp(pair->key, key, keylen)==0); if (pair->key)
} assert(memcmp(pair->key, key, keylen)==0);
if (pair->vallen!=len_ignore) { }
assert(pair->vallen == vallen); if (pair->vallen!=len_ignore) {
if (pair->val) assert(pair->vallen == vallen);
assert(memcmp(pair->val, val, vallen)==0); if (pair->val)
} assert(memcmp(pair->val, val, vallen)==0);
pair->call_count++; // this call_count is really how many calls were made with r==0 }
pair->call_count++; // this call_count is really how many calls were made with r==0
}
} }
return 0; return 0;
} }
......
...@@ -24,9 +24,9 @@ string_cmp(DB* UU(db), const DBT *a, const DBT *b) ...@@ -24,9 +24,9 @@ string_cmp(DB* UU(db), const DBT *a, const DBT *b)
} }
static int static int
found(ITEMLEN UU(keylen), bytevec key, ITEMLEN UU(vallen), bytevec UU(val), void *UU(extra)) found(ITEMLEN UU(keylen), bytevec key, ITEMLEN UU(vallen), bytevec UU(val), void *UU(extra), bool lock_only)
{ {
assert(key != NULL); assert(key != NULL && !lock_only);
return 0; return 0;
} }
......
...@@ -253,6 +253,8 @@ BDB_DONTRUN_TESTS = \ ...@@ -253,6 +253,8 @@ BDB_DONTRUN_TESTS = \
test3522b \ test3522b \
test938c \ test938c \
test_3645 \ test_3645 \
test_3529_insert_2 \
test_3529_table_lock \
test_3755 \ test_3755 \
test_4015 \ test_4015 \
test_abort1 \ test_abort1 \
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007-2011 Tokutek Inc. All rights reserved."
#include "test.h"
// verify that serializable cursor locks deleted keys so that another transaction can not insert into the range being scanned by the cursor
// we create 2 level tree that looks like
// root node with pivot key 2
// left leaf contains keys 0, 1, and 2
// right leaf contains keys 3 and 4
// we delete key 2 while a snapshot txn exist so that garbage collection does not occur.
// txn_a walks a cursor through the deleted keys.
// when txn_a finishes reading the deleted keys, txn_b tries to insert key 2 and should get lock not granted.
#include <db.h>
#include <unistd.h>
#include <sys/stat.h>
#include <pthread.h>
static DB_ENV *env = NULL;
static DB_TXN *txn_a = NULL;
static DB_TXN *txn_b = NULL;
static DB *db = NULL;
static u_int32_t db_page_size = 4096;
// static u_int32_t db_basement_size = 4096;
static char *envdir = ENVDIR;
static int
my_compare(DB *this_db UU(), const DBT *a UU(), const DBT *b UU()) {
assert(a->size == b->size);
return memcmp(a->data, b->data, a->size);
}
static int
my_generate_row(DB *dest_db UU(), DB *src_db UU(), DBT *dest_key UU(), DBT *dest_val UU(), const DBT *src_key UU(), const DBT *src_val UU()) {
assert(dest_key->flags == DB_DBT_REALLOC);
dest_key->data = toku_realloc(dest_key->data, src_key->size);
memcpy(dest_key->data, src_key->data, src_key->size);
dest_key->size = src_key->size;
assert(dest_val->flags == DB_DBT_REALLOC);
dest_val->data = toku_realloc(dest_val->data, src_val->size);
memcpy(dest_val->data, src_val->data, src_val->size);
dest_val->size = src_val->size;
return 0;
}
static int
next_do_nothing(DBT const *UU(a), DBT const *UU(b), void *UU(c)) {
return 0;
}
static void *
do_insert_2(void *arg) {
int r;
u_int64_t key = 2;
char val[800]; memset(val, 0, sizeof val);
DBT k,v;
r = db->put(db, txn_b, dbt_init(&k, &key, sizeof key), dbt_init(&v, val, sizeof val), 0);
assert(r == DB_LOCK_NOTGRANTED);
return arg;
}
static ssize_t
my_pread (int fd, void *buf, size_t count, off_t offset) {
static int my_pread_count = 0;
if (++my_pread_count == 5) {
pthread_t id;
pthread_create(&id, NULL, do_insert_2, NULL);
void *ret;
pthread_join(id, &ret);
}
return pread(fd, buf, count, offset);
}
static void
run_test(void) {
int r;
r = db_env_create(&env, 0); CKERR(r);
env->set_errfile(env, stderr);
r = env->set_redzone(env, 0); CKERR(r);
r = env->set_generate_row_callback_for_put(env, my_generate_row); CKERR(r);
r = env->set_default_bt_compare(env, my_compare); CKERR(r);
r = env->open(env, envdir, DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
r = db_create(&db, env, 0); CKERR(r);
r = db->set_pagesize(db, db_page_size);
DB_TXN *txn = NULL;
r = env->txn_begin(env, 0, &txn, 0); CKERR(r);
r = db->open(db, txn, "foo.db", 0, DB_BTREE, DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
r = txn->commit(txn, 0); CKERR(r);
// build a tree with 2 leaf nodes
r = env->txn_begin(env, 0, &txn, 0); CKERR(r);
DB_LOADER *loader = NULL;
r = env->create_loader(env, txn, &loader, db, 1, &db, NULL, NULL, 0); CKERR(r);
for (u_int64_t i = 0; i < 5; i++) {
u_int64_t key = i;
char val[800]; memset(val, 0, sizeof val);
DBT k,v;
r = loader->put(loader, dbt_init(&k, &key, sizeof key), dbt_init(&v, val, sizeof val)); CKERR(r);
}
r = loader->close(loader); CKERR(r);
r = txn->commit(txn, 0); CKERR(r);
// delete key 2
r = env->txn_begin(env, 0, &txn, 0); CKERR(r);
for (u_int64_t i = 2; i < 3; i++) {
u_int64_t key = i;
DBT k;
r = db->del(db, txn, dbt_init(&k, &key, sizeof key), 0); CKERR(r);
}
r = txn->commit(txn, 0); CKERR(r);
// close and reopen
r = db->close(db, 0); CKERR(r);
r = db_create(&db, env, 0); CKERR(r);
r = env->txn_begin(env, 0, &txn, 0); CKERR(r);
r = db->open(db, txn, "foo.db", 0, DB_BTREE, 0, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
r = txn->commit(txn, 0); CKERR(r);
// create a txn that will try to insert key 2 while the serializable cursor is walking through the tree
r = env->txn_begin(env, 0, &txn_b, 0); CKERR(r);
// walk a serializable cursor through the tree
r = env->txn_begin(env, 0, &txn_a, 0); CKERR(r);
DBC *cursor = NULL;
r = db->cursor(db, txn_a, &cursor, 0); CKERR(r);
db_env_set_func_pread(my_pread);
while (1) {
r = cursor->c_getf_next(cursor, 0, next_do_nothing, NULL);
if (r != 0)
break;
}
db_env_set_func_pread(NULL);
r = cursor->c_close(cursor); CKERR(r);
r = txn_a->commit(txn_a, 0); CKERR(r);
r = txn_b->commit(txn_b, 0); CKERR(r);
r = db->close(db, 0); CKERR(r);
r = env->close(env, 0); CKERR(r);
}
static int
usage(void) {
fprintf(stderr, "-v (verbose)\n");
fprintf(stderr, "-q (quiet)\n");
fprintf(stderr, "--envdir %s\n", envdir);
return 1;
}
int
test_main (int argc , char * const argv[]) {
for (int i = 1 ; i < argc; i++) {
if (strcmp(argv[i], "-v") == 0 || strcmp(argv[i], "--verbose") == 0) {
verbose++;
continue;
}
if (strcmp(argv[i], "-q") == 0) {
if (verbose > 0)
verbose--;
continue;
}
if (strcmp(argv[i], "--envdir") == 0 && i+1 < argc) {
envdir = argv[++i];
continue;
}
return usage();
}
char rmcmd[32 + strlen(envdir)];
snprintf(rmcmd, sizeof rmcmd, "rm -rf %s", envdir);
int r;
r = system(rmcmd); CKERR(r);
r = toku_os_mkdir(envdir, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
run_test();
return 0;
}
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007-2011 Tokutek Inc. All rights reserved."
#include "test.h"
// verify that serializable cursor locks deleted keys so that another transaction can not insert into the range being scanned by the cursor
// we create 2 level tree that looks like
// root node with pivot key 2
// left leaf contains keys 0, 1, and 2
// right leaf contains keys 3 and 4
// we delete keys 0, 1, and 2 while a snapshot txn exist so that garbage collection does not occur.
// txn_a walks a cursor through the deleted keys.
// when txn_a finishes reading the deleted keys, txn_b tries to get a table lock.
// the table lock should fail since txn_a holds a read lock on the deleted key range.
#include <db.h>
#include <unistd.h>
#include <sys/stat.h>
static DB_ENV *env = NULL;
static DB_TXN *txn_a = NULL;
static DB_TXN *txn_b = NULL;
static DB *db = NULL;
static u_int32_t db_page_size = 4096;
// static u_int32_t db_basement_size = 4096;
static char *envdir = ENVDIR;
static int
my_compare(DB *this_db UU(), const DBT *a UU(), const DBT *b UU()) {
assert(a->size == b->size);
return memcmp(a->data, b->data, a->size);
}
static int
my_generate_row(DB *dest_db UU(), DB *src_db UU(), DBT *dest_key UU(), DBT *dest_val UU(), const DBT *src_key UU(), const DBT *src_val UU()) {
assert(dest_key->flags == DB_DBT_REALLOC);
dest_key->data = toku_realloc(dest_key->data, src_key->size);
memcpy(dest_key->data, src_key->data, src_key->size);
dest_key->size = src_key->size;
assert(dest_val->flags == DB_DBT_REALLOC);
dest_val->data = toku_realloc(dest_val->data, src_val->size);
memcpy(dest_val->data, src_val->data, src_val->size);
dest_val->size = src_val->size;
return 0;
}
static int
next_do_nothing(DBT const *UU(a), DBT const *UU(b), void *UU(c)) {
return 0;
}
static ssize_t
my_pread (int fd, void *buf, size_t count, off_t offset) {
static int my_pread_count = 0;
if (++my_pread_count == 5) {
// try to acquire a table lock, should fail
int r = db->pre_acquire_table_lock(db, txn_b);
assert(r == DB_LOCK_NOTGRANTED);
}
return pread(fd, buf, count, offset);
}
static void
run_test(void) {
int r;
r = db_env_create(&env, 0); CKERR(r);
env->set_errfile(env, stderr);
r = env->set_redzone(env, 0); CKERR(r);
r = env->set_generate_row_callback_for_put(env, my_generate_row); CKERR(r);
r = env->set_default_bt_compare(env, my_compare); CKERR(r);
r = env->open(env, envdir, DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
r = db_create(&db, env, 0); CKERR(r);
r = db->set_pagesize(db, db_page_size);
DB_TXN *txn = NULL;
r = env->txn_begin(env, 0, &txn, 0); CKERR(r);
r = db->open(db, txn, "foo.db", 0, DB_BTREE, DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
r = txn->commit(txn, 0); CKERR(r);
// build a tree with 2 leaf nodes
r = env->txn_begin(env, 0, &txn, 0); CKERR(r);
DB_LOADER *loader = NULL;
r = env->create_loader(env, txn, &loader, db, 1, &db, NULL, NULL, 0); CKERR(r);
for (u_int64_t i = 0; i < 5; i++) {
u_int64_t key = i;
char val[800]; memset(val, 0, sizeof val);
DBT k,v;
r = loader->put(loader, dbt_init(&k, &key, sizeof key), dbt_init(&v, val, sizeof val)); CKERR(r);
}
r = loader->close(loader); CKERR(r);
r = txn->commit(txn, 0); CKERR(r);
// this transaction ensure that garbage collection does not occur when deleting
DB_TXN *bogus_txn = NULL;
r = env->txn_begin(env, 0, &bogus_txn, DB_TXN_SNAPSHOT); CKERR(r);
// delete the keys in the first leaf node
r = env->txn_begin(env, 0, &txn, 0); CKERR(r);
for (u_int64_t i = 0; i < 3; i++) {
u_int64_t key = i;
DBT k;
r = db->del(db, txn, dbt_init(&k, &key, sizeof key), 0); CKERR(r);
}
r = txn->commit(txn, 0); CKERR(r);
r = bogus_txn->commit(bogus_txn, 0); CKERR(r);
// close and reopen
r = db->close(db, 0); CKERR(r);
r = db_create(&db, env, 0); CKERR(r);
r = env->txn_begin(env, 0, &txn, 0); CKERR(r);
r = db->open(db, txn, "foo.db", 0, DB_BTREE, 0, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
r = txn->commit(txn, 0); CKERR(r);
// create a txn that will try to acquire a write lock on key 0 in the pread callback
r = env->txn_begin(env, 0, &txn_b, 0); CKERR(r);
// walk a serializable cursor through the tree
r = env->txn_begin(env, 0, &txn_a, 0); CKERR(r);
DBC *cursor = NULL;
r = db->cursor(db, txn_a, &cursor, 0); CKERR(r);
db_env_set_func_pread(my_pread);
while (1) {
r = cursor->c_getf_next(cursor, 0, next_do_nothing, NULL);
if (r != 0)
break;
}
db_env_set_func_pread(NULL);
r = cursor->c_close(cursor); CKERR(r);
r = txn_a->commit(txn_a, 0); CKERR(r);
r = txn_b->commit(txn_b, 0); CKERR(r);
r = db->close(db, 0); CKERR(r);
r = env->close(env, 0); CKERR(r);
}
static int
usage(void) {
fprintf(stderr, "-v (verbose)\n");
fprintf(stderr, "-q (quiet)\n");
fprintf(stderr, "--envdir %s\n", envdir);
return 1;
}
int
test_main (int argc , char * const argv[]) {
for (int i = 1 ; i < argc; i++) {
if (strcmp(argv[i], "-v") == 0 || strcmp(argv[i], "--verbose") == 0) {
verbose++;
continue;
}
if (strcmp(argv[i], "-q") == 0) {
if (verbose > 0)
verbose--;
continue;
}
if (strcmp(argv[i], "--envdir") == 0 && i+1 < argc) {
envdir = argv[++i];
continue;
}
return usage();
}
char rmcmd[32 + strlen(envdir)];
snprintf(rmcmd, sizeof rmcmd, "rm -rf %s", envdir);
int r;
r = system(rmcmd); CKERR(r);
r = toku_os_mkdir(envdir, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
run_test();
return 0;
}
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment