Commit 98d5d2c9 authored by Yoni Fogel's avatar Yoni Fogel

Addresses #368

Deals with NULL transactions in an INIT_TXN environment.
Does not deal with NULL txn cursors yet.
Also adds DB_AUTO_COMMIT

git-svn-id: file:///svn/tokudb@2252 c7de825b-a66e-492c-adef-691d508d4ae1
parent 40bdcc0d
......@@ -1999,10 +1999,14 @@ static inline void brt_cursor_cleanup(BRT_CURSOR cursor) {
dbt_cleanup(&cursor->val);
}
inline int brt_cursor_not_set(BRT_CURSOR cursor) {
static inline int brt_cursor_not_set(BRT_CURSOR cursor) {
return cursor->key.data == 0 || cursor->val.data == 0;
}
BOOL toku_brt_cursor_uninitialized(BRT_CURSOR c) {
return brt_cursor_not_set(c);
}
static inline void brt_cursor_set_key_val(BRT_CURSOR cursor, DBT *newkey, DBT *newval) {
brt_cursor_cleanup(cursor);
cursor->key = *newkey; memset(newkey, 0, sizeof *newkey);
......
......@@ -51,6 +51,7 @@ int toku_brt_cursor (BRT, BRT_CURSOR*);
int toku_brt_cursor_get (BRT_CURSOR cursor, DBT *kbt, DBT *vbt, int brtc_flags, TOKUTXN);
int toku_brt_cursor_delete(BRT_CURSOR cursor, int flags, TOKUTXN);
int toku_brt_cursor_close (BRT_CURSOR curs);
BOOL toku_brt_cursor_uninitialized(BRT_CURSOR c);
typedef struct brtenv *BRTENV;
int brtenv_checkpoint (BRTENV env);
......
......@@ -44,7 +44,7 @@ char* toku_lt_strerror(TOKU_LT_ERROR r) {
return "Unknown error in locking data structures.\n";
}
/* Compare two payloads assuming that at least one of them is infinite */
static int __toku_infinite_compare(void* a, void* b) {
inline static int __toku_infinite_compare(void* a, void* b) {
if (a == b) return 0;
if (a == toku_lt_infinity) return 1;
if (b == toku_lt_infinity) return -1;
......@@ -52,7 +52,7 @@ static int __toku_infinite_compare(void* a, void* b) {
assert(b == toku_lt_neg_infinity); return 1;
}
static BOOL __toku_lt_is_infinite(const void* p) {
inline static BOOL __toku_lt_is_infinite(const void* p) {
if (p == toku_lt_infinity || p == toku_lt_neg_infinity) {
DBT* dbt = (DBT*)p;
assert(!dbt->data && !dbt->size);
......@@ -63,19 +63,19 @@ static BOOL __toku_lt_is_infinite(const void* p) {
/* Verifies that NULL data and size are consistent.
i.e. The size is 0 if and only if the data is NULL. */
static void __toku_lt_verify_null_key(const DBT* key) {
assert(!key || __toku_lt_is_infinite(key) ||
(key->size > 0) == (key->data != NULL));
inline static int __toku_lt_verify_null_key(const DBT* key) {
if (key && key->size && !key->data) return EINVAL;
return 0;
}
static DBT* __toku_recreate_DBT(DBT* dbt, void* payload, u_int32_t length) {
inline static DBT* __toku_recreate_DBT(DBT* dbt, void* payload, u_int32_t length) {
memset(dbt, 0, sizeof(DBT));
dbt->data = payload;
dbt->size = length;
return dbt;
}
static int __toku_lt_txn_cmp(void* a, void* b) {
inline static int __toku_lt_txn_cmp(void* a, void* b) {
return a < b ? -1 : (a != b);
}
......@@ -118,27 +118,27 @@ int __toku_lt_point_cmp(void* a, void* b) {
/* Functions to update the range count and compare it with the
maximum number of ranges */
static BOOL __toku_lt_range_test_incr(toku_lock_tree* tree, u_int32_t replace) {
inline static BOOL __toku_lt_range_test_incr(toku_lock_tree* tree, u_int32_t replace) {
assert(tree);
assert(tree->num_ranges);
assert(replace <= *tree->num_ranges);
return *tree->num_ranges - replace < tree->max_ranges;
}
static void __toku_lt_range_incr(toku_lock_tree* tree, u_int32_t replace) {
inline static void __toku_lt_range_incr(toku_lock_tree* tree, u_int32_t replace) {
assert(__toku_lt_range_test_incr(tree, replace));
*tree->num_ranges -= replace;
*tree->num_ranges += 1;
}
static void __toku_lt_range_decr(toku_lock_tree* tree, u_int32_t ranges) {
inline static void __toku_lt_range_decr(toku_lock_tree* tree, u_int32_t ranges) {
assert(tree);
assert(tree->num_ranges);
assert(*tree->num_ranges >= ranges);
*tree->num_ranges -= ranges;
}
static void __toku_p_free(toku_lock_tree* tree, toku_point* point) {
inline static void __toku_p_free(toku_lock_tree* tree, toku_point* point) {
assert(point);
if (!__toku_lt_is_infinite(point->key_payload)) {
tree->free(point->key_payload);
......@@ -152,7 +152,7 @@ static void __toku_p_free(toku_lock_tree* tree, toku_point* point) {
/*
Allocate and copy the payload.
*/
static int __toku_payload_copy(toku_lock_tree* tree,
inline static int __toku_payload_copy(toku_lock_tree* tree,
void** payload_out, u_int32_t* len_out,
void* payload_in, u_int32_t len_in) {
assert(payload_out && len_out);
......@@ -171,7 +171,7 @@ static int __toku_payload_copy(toku_lock_tree* tree,
return 0;
}
static int __toku_p_makecopy(toku_lock_tree* tree, void** ppoint) {
inline static int __toku_p_makecopy(toku_lock_tree* tree, void** ppoint) {
assert(ppoint);
toku_point* point = *(toku_point**)ppoint;
toku_point* temp_point = NULL;
......@@ -219,7 +219,7 @@ toku_range_tree* __toku_lt_ifexist_selfwrite(toku_lock_tree* tree,
/* Provides access to a selfread tree for a particular transaction.
Creates it if it does not exist. */
static int __toku_lt_selfread(toku_lock_tree* tree, DB_TXN* txn,
inline static int __toku_lt_selfread(toku_lock_tree* tree, DB_TXN* txn,
toku_range_tree** pselfread) {
int r;
assert(tree && txn && pselfread);
......@@ -246,7 +246,7 @@ static int __toku_lt_selfread(toku_lock_tree* tree, DB_TXN* txn,
/* Provides access to a selfwrite tree for a particular transaction.
Creates it if it does not exist. */
static int __toku_lt_selfwrite(toku_lock_tree* tree, DB_TXN* txn,
inline static int __toku_lt_selfwrite(toku_lock_tree* tree, DB_TXN* txn,
toku_range_tree** pselfwrite) {
int r;
assert(tree && txn && pselfwrite);
......@@ -276,7 +276,7 @@ static int __toku_lt_selfwrite(toku_lock_tree* tree, DB_TXN* txn,
Uses the standard definition of dominated from the design document.
Determines whether 'query' is dominated by 'rt'.
*/
static int __toku_lt_rt_dominates(toku_lock_tree* tree, toku_range* query,
inline static int __toku_lt_rt_dominates(toku_lock_tree* tree, toku_range* query,
toku_range_tree* rt, BOOL* dominated) {
assert(tree && query && dominated);
if (!rt) {
......@@ -321,7 +321,7 @@ typedef enum
If exactly one range overlaps and its data != self, there might be a
conflict. We need to check the 'peer'write table to verify.
*/
static int __toku_lt_borderwrite_conflict(toku_lock_tree* tree, DB_TXN* self,
inline static int __toku_lt_borderwrite_conflict(toku_lock_tree* tree, DB_TXN* self,
toku_range* query,
toku_conflict* conflict, DB_TXN** peer) {
assert(tree && self && query && conflict && peer);
......@@ -355,7 +355,7 @@ static int __toku_lt_borderwrite_conflict(toku_lock_tree* tree, DB_TXN* self,
Uses the standard definition of 'query' meets 'tree' at 'data' from the
design document.
*/
static int __toku_lt_meets(toku_lock_tree* tree, toku_range* query,
inline static int __toku_lt_meets(toku_lock_tree* tree, toku_range* query,
toku_range_tree* rt, BOOL* met) {
assert(tree && query && rt && met);
const u_int32_t query_size = 1;
......@@ -385,7 +385,7 @@ static int __toku_lt_meets(toku_lock_tree* tree, toku_range* query,
Uses the standard definition of 'query' meets 'tree' at 'data' from the
design document.
*/
static int __toku_lt_meets_peer(toku_lock_tree* tree, toku_range* query,
inline static int __toku_lt_meets_peer(toku_lock_tree* tree, toku_range* query,
toku_range_tree* rt, DB_TXN* self, BOOL* met) {
assert(tree && query && rt && self && met);
assert(query->left == query->right);
......@@ -408,7 +408,7 @@ static int __toku_lt_meets_peer(toku_lock_tree* tree, toku_range* query,
Utility function to implement: (from design document)
if K meets E at v'!=t and K meets W_v' then return failure.
*/
static int __toku_lt_check_borderwrite_conflict(toku_lock_tree* tree,
inline static int __toku_lt_check_borderwrite_conflict(toku_lock_tree* tree,
DB_TXN* txn, toku_range* query) {
assert(tree && txn && query);
toku_conflict conflict;
......@@ -433,18 +433,21 @@ static int __toku_lt_check_borderwrite_conflict(toku_lock_tree* tree,
return 0;
}
static void __toku_payload_from_dbt(void** payload, u_int32_t* len,
inline static void __toku_payload_from_dbt(void** payload, u_int32_t* len,
const DBT* dbt) {
assert(payload && len && dbt);
if (__toku_lt_is_infinite(dbt)) *payload = (void*)dbt;
else {
assert(!dbt->data == !dbt->size);
else if (!dbt->size) {
*payload = NULL;
*len = 0;
} else {
assert(dbt->data);
*payload = dbt->data;
*len = dbt->size;
}
*len = dbt->size;
}
static void __toku_init_point(toku_point* point, toku_lock_tree* tree,
inline static void __toku_init_point(toku_point* point, toku_lock_tree* tree,
const DBT* key, const DBT* data) {
assert(point && tree && key);
assert(!tree->duplicates == !data);
......@@ -463,14 +466,14 @@ static void __toku_init_point(toku_point* point, toku_lock_tree* tree,
}
}
static void __toku_init_query(toku_range* query,
inline static void __toku_init_query(toku_range* query,
toku_point* left, toku_point* right) {
query->left = left;
query->right = right;
query->data = NULL;
}
static void __toku_init_insert(toku_range* to_insert,
inline static void __toku_init_insert(toku_range* to_insert,
toku_point* left, toku_point* right,
DB_TXN* txn) {
to_insert->left = left;
......@@ -480,12 +483,12 @@ static void __toku_init_insert(toku_range* to_insert,
/* Returns whether the point already exists
as an endpoint of the given range. */
static BOOL __toku_lt_p_independent(toku_point* point, toku_range* range) {
inline static BOOL __toku_lt_p_independent(toku_point* point, toku_range* range) {
assert(point && range);
return point != range->left && point != range->right;
}
static int __toku_lt_extend_extreme(toku_lock_tree* tree,toku_range* to_insert,
inline static int __toku_lt_extend_extreme(toku_lock_tree* tree,toku_range* to_insert,
BOOL* alloc_left, BOOL* alloc_right,
u_int32_t numfound) {
assert(to_insert && tree && alloc_left && alloc_right);
......@@ -517,7 +520,7 @@ static int __toku_lt_extend_extreme(toku_lock_tree* tree,toku_range* to_insert,
return 0;
}
static int __toku_lt_alloc_extreme(toku_lock_tree* tree, toku_range* to_insert,
inline static int __toku_lt_alloc_extreme(toku_lock_tree* tree, toku_range* to_insert,
BOOL alloc_left, BOOL* alloc_right) {
assert(to_insert && alloc_right);
BOOL copy_left = FALSE;
......@@ -547,7 +550,7 @@ static int __toku_lt_alloc_extreme(toku_lock_tree* tree, toku_range* to_insert,
return 0;
}
static int __toku_lt_delete_overlapping_ranges(toku_lock_tree* tree,
inline static int __toku_lt_delete_overlapping_ranges(toku_lock_tree* tree,
toku_range_tree* rt,
u_int32_t numfound) {
assert(tree && rt);
......@@ -561,7 +564,7 @@ static int __toku_lt_delete_overlapping_ranges(toku_lock_tree* tree,
return 0;
}
static int __toku_lt_free_points(toku_lock_tree* tree, toku_range* to_insert,
inline static int __toku_lt_free_points(toku_lock_tree* tree, toku_range* to_insert,
u_int32_t numfound, toku_range_tree *rt) {
assert(tree && to_insert);
assert(numfound <= tree->buflen);
......@@ -591,7 +594,7 @@ static int __toku_lt_free_points(toku_lock_tree* tree, toku_range* to_insert,
}
/* Consolidate the new range and all the overlapping ranges */
static int __toku_consolidate(toku_lock_tree* tree,
inline static int __toku_consolidate(toku_lock_tree* tree,
toku_range* query, toku_range* to_insert,
DB_TXN* txn) {
int r;
......@@ -656,7 +659,7 @@ static int __toku_consolidate(toku_lock_tree* tree,
return 0;
}
static void __toku_lt_init_full_query(toku_lock_tree* tree, toku_range* query,
inline static void __toku_lt_init_full_query(toku_lock_tree* tree, toku_range* query,
toku_point* left, toku_point* right) {
__toku_init_point(left, tree, (DBT*)toku_lt_neg_infinity,
tree->duplicates ? (DBT*)toku_lt_neg_infinity : NULL);
......@@ -665,7 +668,7 @@ static void __toku_lt_init_full_query(toku_lock_tree* tree, toku_range* query,
__toku_init_query(query, left, right);
}
static int __toku_lt_free_contents_slow(toku_lock_tree* tree,
inline static int __toku_lt_free_contents_slow(toku_lock_tree* tree,
toku_range_tree* rt,
toku_range_tree* rtdel) {
int r;
......@@ -693,7 +696,7 @@ static int __toku_lt_free_contents_slow(toku_lock_tree* tree,
return r;
}
static int __toku_lt_free_contents(toku_lock_tree* tree, toku_range_tree* rt,
inline static int __toku_lt_free_contents(toku_lock_tree* tree, toku_range_tree* rt,
toku_range_tree *rtdel) {
assert(tree);
if (!rt) return 0;
......@@ -716,7 +719,7 @@ static int __toku_lt_free_contents(toku_lock_tree* tree, toku_range_tree* rt,
return r;
}
static BOOL __toku_r_backwards(toku_range* range) {
inline static BOOL __toku_r_backwards(toku_range* range) {
assert(range && range->left && range->right);
toku_point* left = (toku_point*)range->left;
toku_point* right = (toku_point*)range->right;
......@@ -728,7 +731,7 @@ static BOOL __toku_r_backwards(toku_range* range) {
}
static int __toku_lt_preprocess(toku_lock_tree* tree, DB_TXN* txn,
inline static int __toku_lt_preprocess(toku_lock_tree* tree, DB_TXN* txn,
const DBT* key_left, const DBT** pdata_left,
const DBT* key_right, const DBT** pdata_right,
toku_point* left, toku_point* right,
......@@ -744,12 +747,13 @@ static int __toku_lt_preprocess(toku_lock_tree* tree, DB_TXN* txn,
if (tree->duplicates && key_right != data_right &&
__toku_lt_is_infinite(key_right)) return EINVAL;
int r;
/* Verify that NULL keys have payload and size that are mutually
consistent*/
__toku_lt_verify_null_key(key_left);
__toku_lt_verify_null_key(data_left);
__toku_lt_verify_null_key(key_right);
__toku_lt_verify_null_key(data_right);
if ((r = __toku_lt_verify_null_key(key_left)) != 0) return r;
if ((r = __toku_lt_verify_null_key(data_left)) != 0) return r;
if ((r = __toku_lt_verify_null_key(key_right)) != 0) return r;
if ((r = __toku_lt_verify_null_key(data_right)) != 0) return r;
__toku_init_point(left, tree, key_left, data_left);
__toku_init_point(right, tree, key_right, data_right);
......@@ -758,12 +762,12 @@ static int __toku_lt_preprocess(toku_lock_tree* tree, DB_TXN* txn,
if (__toku_r_backwards(query)) return EDOM;
tree->dups_final = TRUE;
int r = __toku_lt_callback(tree, txn);
r = __toku_lt_callback(tree, txn);
if (r!=0) return r;
return 0;
}
static int __toku_lt_get_border(toku_lock_tree* tree, BOOL in_borderwrite,
inline static int __toku_lt_get_border(toku_lock_tree* tree, BOOL in_borderwrite,
toku_range* pred, toku_range* succ,
BOOL* found_p, BOOL* found_s,
toku_range* to_insert) {
......@@ -780,7 +784,7 @@ static int __toku_lt_get_border(toku_lock_tree* tree, BOOL in_borderwrite,
return 0;
}
static int __toku_lt_expand_border(toku_lock_tree* tree, toku_range* to_insert,
inline static int __toku_lt_expand_border(toku_lock_tree* tree, toku_range* to_insert,
toku_range* pred, toku_range* succ,
BOOL found_p, BOOL found_s) {
assert(tree && to_insert && pred && succ);
......@@ -798,7 +802,7 @@ static int __toku_lt_expand_border(toku_lock_tree* tree, toku_range* to_insert,
return 0;
}
static int __toku_lt_split_border(toku_lock_tree* tree, toku_range* to_insert,
inline static int __toku_lt_split_border(toku_lock_tree* tree, toku_range* to_insert,
toku_range* pred, toku_range* succ,
BOOL found_p, BOOL found_s) {
assert(tree && to_insert && pred && succ);
......@@ -850,7 +854,7 @@ static int __toku_lt_split_border(toku_lock_tree* tree, toku_range* to_insert,
done with borderwrite.
insert point,point into selfwrite.
*/
static int __toku_lt_borderwrite_insert(toku_lock_tree* tree,
inline static int __toku_lt_borderwrite_insert(toku_lock_tree* tree,
toku_range* query,
toku_range* to_insert) {
assert(tree && query && to_insert);
......@@ -1083,7 +1087,13 @@ int toku_lt_acquire_range_write_lock(toku_lock_tree* tree, DB_TXN* txn,
toku_point left;
toku_point right;
toku_range query;
assert(tree);
if (key_left == key_right &&
(data_left == data_right || !tree->duplicates)) {
return toku_lt_acquire_write_lock(tree, txn, key_left, data_left);
}
r = __toku_lt_preprocess(tree, txn, key_left, &data_left,
key_right, &data_right,
&left, &right,
......@@ -1096,7 +1106,7 @@ int toku_lt_acquire_range_write_lock(toku_lock_tree* tree, DB_TXN* txn,
}
static int __toku_sweep_border(toku_lock_tree* tree, toku_range* range) {
inline static int __toku_sweep_border(toku_lock_tree* tree, toku_range* range) {
assert(tree && range);
toku_range_tree* borderwrite = tree->borderwrite;
assert(borderwrite);
......@@ -1164,7 +1174,7 @@ static int __toku_sweep_border(toku_lock_tree* tree, toku_range* range) {
If both found and pred.data=succ.data, merge pred and succ (expand?)
free_points
*/
static int __toku_lt_border_delete(toku_lock_tree* tree, toku_range_tree* rt) {
inline static int __toku_lt_border_delete(toku_lock_tree* tree, toku_range_tree* rt) {
int r;
assert(tree);
if (!rt) return 0;
......
......@@ -514,9 +514,17 @@ static void toku_env_set_errpfx(DB_ENV * env, const char *errpfx) {
static int toku_env_set_flags(DB_ENV * env, u_int32_t flags, int onoff) {
HANDLE_PANICKED_ENV(env);
u_int32_t change = 0;
if (flags & DB_AUTO_COMMIT) {
change |= DB_AUTO_COMMIT;
flags &= ~DB_AUTO_COMMIT;
}
if (flags != 0 && onoff) {
return do_error(env, EINVAL, "TokuDB does not (yet) support any nonzero ENV flags\n");
return do_error(env, EINVAL, "TokuDB does not (yet) support any nonzero ENV flags other than DB_AUTO_COMMIT\n");
}
if (onoff) env->i->open_flags |= change;
else env->i->open_flags &= ~change;
return 0;
}
......@@ -782,20 +790,21 @@ int db_env_create(DB_ENV ** envp, u_int32_t flags) {
static int toku_txn_release_locks(DB_TXN* txn) {
assert(txn);
toku_lth* lth = txn->i->lth;
assert(lth);
int r;
int r2 = 0;
toku_lth_start_scan(lth);
toku_lock_tree* next = toku_lth_next(lth);
while (next) {
r = toku_lt_unlock(next, txn);
if (r!=0 && !r2) r2 = r;
next = toku_lth_next(lth);
}
toku_lth_close(lth);
txn->i->lth = NULL;
return r2;
int r = 0;
if (lth) {
toku_lth_start_scan(lth);
toku_lock_tree* next = toku_lth_next(lth);
int r2;
while (next) {
r2 = toku_lt_unlock(next, txn);
if (r2!=0 && !r) r = r2;
next = toku_lth_next(lth);
}
toku_lth_close(lth);
txn->i->lth = NULL;
}
return r;
}
static int toku_txn_commit(DB_TXN * txn, u_int32_t flags) {
......@@ -864,6 +873,7 @@ static int locked_txn_abort(DB_TXN *txn) {
static int toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags) {
HANDLE_PANICKED_ENV(env);
if (!toku_logger_is_open(env->i->logger)) return do_error(env, EINVAL, "Environment does not have logging enabled\n");
if (!(env->i->open_flags & DB_INIT_TXN)) return do_error(env, EINVAL, "Environment does not have transactions enabled\n");
flags=flags;
DB_TXN *MALLOC(result);
if (result == 0)
......@@ -880,12 +890,16 @@ static int toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t f
return ENOMEM;
}
result->i->parent = stxn;
int r = toku_lth_create(&result->i->lth, toku_malloc, toku_free, toku_realloc);
if (r!=0) {
toku_free(result->i);
toku_free(result);
return r;
int r;
if (env->i->open_flags & DB_INIT_LOCK) {
r = toku_lth_create(&result->i->lth,
toku_malloc, toku_free, toku_realloc);
if (r!=0) {
toku_free(result->i);
toku_free(result);
return r;
}
}
r = toku_logger_txn_begin(stxn ? stxn->i->tokutxn : 0, &result->i->tokutxn, next_txn++, env->i->logger);
......@@ -1038,16 +1052,14 @@ static int get_main_cursor_flag(u_int32_t flag) {
return flag;
}
int brt_cursor_not_set(BRT_CURSOR cursor);
static BOOL toku_c_uninitialized(DBC* c) {
return brt_cursor_not_set(c->i->c);
}
inline static BOOL toku_c_uninitialized(DBC* c) {
return toku_brt_cursor_uninitialized(c->i->c);
}
static int toku_c_get_current_unconditional(DBC* c, DBT* key, DBT* data) {
assert(!toku_c_uninitialized(c));
memset(key, 0, sizeof(DBT));
memset(data, 0, sizeof(DBT));
data->flags = key->flags = DB_DBT_MALLOC;
TOKUTXN txn = c->i->txn ? c->i->txn->i->tokutxn : NULL;
int r = toku_brt_cursor_get(c->i->c, key, data, DB_CURRENT_BINDING, txn);
......@@ -1102,7 +1114,6 @@ static int toku_c_get_pre_lock(DBC* c, DBT* key, DBT* data, u_int32_t* flag,
r = toku_save_original_data(saved_key, key);
break;
}
//TODO: #warning "Verify our understanding of DB_GET_BOTH_RANGE IS CORRECT HERE"
case (DB_GET_BOTH_RANGE): {
if (!duplicates) {
toku_swap_flag(flag, &get_flag, DB_GET_BOTH); goto get_both; }
......@@ -1199,7 +1210,6 @@ static int toku_c_get_post_lock(DBC* c, DBT* key, DBT* data, u_int32_t flag,
data_r = found ? data : toku_lt_infinity;
break;
}
//TODO: #warning "Verify our understanding of DB_GET_BOTH_RANGE IS CORRECT HERE"
case (DB_GET_BOTH_RANGE): {
key_l = key_r = key;
data_l = saved_data;
......@@ -1272,8 +1282,23 @@ static int toku_c_get_noassociate(DBC * c, DBT * key, DBT * data, u_int32_t flag
}
static int toku_c_del_noassociate(DBC * c, u_int32_t flags) {
HANDLE_PANICKED_DB(c->dbp);
int r = toku_brt_cursor_delete(c->i->c, flags, c->i->txn ? c->i->txn->i->tokutxn : 0);
DB* db = c->dbp;
HANDLE_PANICKED_DB(db);
if (toku_c_uninitialized(c)) return EINVAL;
int r;
if (db->i->lt) {
DBT saved_key;
DBT saved_data;
r = toku_c_get_current_unconditional(c, &saved_key, &saved_data);
if (r!=0) return r;
r = toku_lt_acquire_write_lock(db->i->lt, c->i->txn,
&saved_key, &saved_data);
if (saved_key.data) toku_free(saved_key.data);
if (saved_data.data) toku_free(saved_data.data);
if (r!=0) return r;
}
r = toku_brt_cursor_delete(c->i->c, flags, c->i->txn ? c->i->txn->i->tokutxn : 0);
return r;
}
......@@ -1488,6 +1513,12 @@ static int toku_db_del_noassociate(DB * db, DB_TXN * txn, DBT * key, u_int32_t f
toku_free(search_val.data);
}
//Do the actual deleting.
if (db->i->lt) {
r = toku_lt_acquire_range_write_lock(db->i->lt, txn,
key, toku_lt_neg_infinity,
key, toku_lt_infinity);
if (r!=0) return r;
}
r = toku_brt_delete(db->i->brt, key, txn ? txn->i->tokutxn : 0);
return r;
}
......@@ -2067,6 +2098,7 @@ static int toku_db_put_noassociate(DB * db, DB_TXN * txn, DBT * key, DBT * data,
r = toku_db_get_noassociate(db, txn, key, toku_init_dbt(&testfordata), 0);
if (r == 0)
return DB_KEYEXIST;
if (r != DB_NOTFOUND) return r;
} else if (flags != 0) {
/* no other flags are currently supported */
return EINVAL;
......@@ -2077,6 +2109,7 @@ static int toku_db_put_noassociate(DB * db, DB_TXN * txn, DBT * key, DBT * data,
r = toku_db_get_noassociate(db, txn, key, data, DB_GET_BOTH);
if (r == 0)
return DB_KEYEXIST;
if (r != DB_NOTFOUND) return r;
#else
return do_error(db->dbenv, EINVAL, "Tokudb requires that db->put specify DB_YESOVERWRITE or DB_NOOVERWRITE on DB_DUPSORT databases");
#endif
......@@ -2244,37 +2277,117 @@ static int toku_db_fd(DB *db, int *fdp) {
#if _THREAD_SAFE
//TODO: DB_AUTO_COMMIT.
//TODO: Nowait only conditionally?
//TODO: NOSYNC change to SYNC if DB_ENV has something in set_flags
inline static int toku_db_construct_autotxn(DB* db, DB_TXN **txn, BOOL* changed,
BOOL force_auto_commit) {
assert(db && txn && changed);
DB_ENV* env = db->dbenv;
if (*txn || !(env->i->open_flags & DB_INIT_TXN)) {
*changed = FALSE;
return 0;
}
BOOL nosync = !force_auto_commit && !(env->i->open_flags & DB_AUTO_COMMIT);
u_int32_t txn_flags = DB_TXN_NOWAIT | (nosync ? DB_TXN_NOSYNC : 0);
int r = toku_txn_begin(env, NULL, txn, txn_flags);
if (r!=0) return r;
*changed = TRUE;
return 0;
}
inline static int toku_db_destruct_autotxn(DB_TXN *txn, int r, BOOL changed) {
if (!changed) return r;
if (r==0) return toku_txn_commit(txn, 0);
toku_txn_abort(txn);
return r;
}
inline static int autotxn_db_associate(DB *primary, DB_TXN *txn, DB *secondary,
int (*callback)(DB *secondary, const DBT *key, const DBT *data, DBT *result), u_int32_t flags) {
BOOL changed; int r;
r = toku_db_construct_autotxn(primary, &txn, &changed, FALSE);
if (r!=0) return r;
r = toku_db_associate(primary, txn, secondary, callback, flags);
return toku_db_destruct_autotxn(txn, r, changed);
}
static int locked_db_associate (DB *primary, DB_TXN *txn, DB *secondary,
int (*callback)(DB *secondary, const DBT *key, const DBT *data, DBT *result), u_int32_t flags) {
ydb_lock(); int r = toku_db_associate(primary, txn, secondary, callback, flags); ydb_unlock(); return r;
ydb_lock(); int r = autotxn_db_associate(primary, txn, secondary, callback, flags); ydb_unlock(); return r;
}
static int locked_db_close(DB * db, u_int32_t flags) {
ydb_lock(); int r = toku_db_close(db, flags); ydb_unlock(); return r;
}
//TODO: Something about the cursor with no txn.. EINVAL maybe?
static int locked_db_cursor(DB *db, DB_TXN *txn, DBC **c, u_int32_t flags) {
ydb_lock(); int r = toku_db_cursor(db, txn, c, flags); ydb_unlock(); return r;
}
inline static int autotxn_db_del(DB* db, DB_TXN* txn, DBT* key,
u_int32_t flags) {
BOOL changed; int r;
r = toku_db_construct_autotxn(db, &txn, &changed, FALSE);
if (r!=0) return r;
r = toku_db_del(db, txn, key, flags);
return toku_db_destruct_autotxn(txn, r, changed);
}
static int locked_db_del(DB * db, DB_TXN * txn, DBT * key, u_int32_t flags) {
ydb_lock(); int r = toku_db_del(db, txn, key, flags); ydb_unlock(); return r;
ydb_lock(); int r = autotxn_db_del(db, txn, key, flags); ydb_unlock(); return r;
}
inline static int autotxn_db_get(DB* db, DB_TXN* txn, DBT* key, DBT* data,
u_int32_t flags) {
BOOL changed; int r;
r = toku_db_construct_autotxn(db, &txn, &changed, FALSE);
if (r!=0) return r;
r = toku_db_get(db, txn, key, data, flags);
return toku_db_destruct_autotxn(txn, r, changed);
}
static int locked_db_get (DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags) {
ydb_lock(); int r = toku_db_get(db, txn, key, data, flags); ydb_unlock(); return r;
ydb_lock(); int r = autotxn_db_get(db, txn, key, data, flags); ydb_unlock(); return r;
}
inline static int autotxn_db_pget(DB* db, DB_TXN* txn, DBT* key, DBT* pkey,
DBT* data, u_int32_t flags) {
BOOL changed; int r;
r = toku_db_construct_autotxn(db, &txn, &changed, FALSE);
if (r!=0) return r;
r = toku_db_pget(db, txn, key, pkey, data, flags);
return toku_db_destruct_autotxn(txn, r, changed);
}
static int locked_db_pget (DB *db, DB_TXN *txn, DBT *key, DBT *pkey, DBT *data, u_int32_t flags) {
ydb_lock(); int r = toku_db_pget(db, txn, key, pkey, data, flags); ydb_unlock(); return r;
ydb_lock(); int r = autotxn_db_pget(db, txn, key, pkey, data, flags); ydb_unlock(); return r;
}
inline static int autotxn_db_open(DB* db, DB_TXN* txn, const char *fname, const char *dbname, DBTYPE dbtype, u_int32_t flags, int mode) {
BOOL changed; int r;
r = toku_db_construct_autotxn(db, &txn, &changed, flags & DB_AUTO_COMMIT);
if (r!=0) return r;
r = toku_db_open(db, txn, fname, dbname, dbtype, flags & ~DB_AUTO_COMMIT, mode);
return toku_db_destruct_autotxn(txn, r, changed);
}
static int locked_db_open(DB *db, DB_TXN *txn, const char *fname, const char *dbname, DBTYPE dbtype, u_int32_t flags, int mode) {
ydb_lock(); int r = toku_db_open(db, txn, fname, dbname, dbtype, flags, mode); ydb_unlock(); return r;
}
inline static int autotxn_db_put(DB* db, DB_TXN* txn, DBT* key, DBT* data,
u_int32_t flags) {
BOOL changed; int r;
r = toku_db_construct_autotxn(db, &txn, &changed, FALSE);
if (r!=0) return r;
r = toku_db_put(db, txn, key, data, flags);
return toku_db_destruct_autotxn(txn, r, changed);
}
static int locked_db_put(DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags) {
ydb_lock(); int r = toku_db_put(db, txn, key, data, flags); ydb_unlock(); return r;
ydb_lock(); int r = autotxn_db_put(db, txn, key, data, flags); ydb_unlock(); return r;
}
static int locked_db_remove(DB * db, const char *fname, const char *dbname, u_int32_t flags) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment