Commit 62c34b76 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

#3045 merge update_multiple bug fixes to main refs[t:3045]

git-svn-id: file:///svn/toku/tokudb@25722 c7de825b-a66e-492c-adef-691d508d4ae1
parent c28309a8
......@@ -293,22 +293,19 @@ struct __toku_db_env {
int (*create_loader) (DB_ENV *env, DB_TXN *txn, DB_LOADER **blp, DB *src_db, int N, DB *dbs[/*N*/], uint32_t db_flags[/*N*/], uint32_t dbt_flags[/*N*/], uint32_t loader_flags);
int (*create_indexer) (DB_ENV *env, DB_TXN *txn, DB_INDEXER **idxrp, DB *src_db, int N, DB *dbs[/*N*/], uint32_t db_flags[/*N*/], uint32_t indexer_flags);
int (*put_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array
) /* insert into multiple DBs */;
const DBT *src_key, const DBT *src_val,
uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array) /* insert into multiple DBs */;
int (*set_generate_row_callback_for_put) (DB_ENV *env, generate_row_for_put_func generate_row_for_put);
int (*del_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array
) /* delete from multiple DBs */;
const DBT *src_key, const DBT *src_val,
uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array) /* delete from multiple DBs */;
int (*set_generate_row_callback_for_del) (DB_ENV *env, generate_row_for_del_func generate_row_for_del);
int (*update_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
DBT *old_src_key, DBT *old_src_data,
DBT *new_src_key, DBT *new_src_data,
uint32_t num_dbs, DB **db_array, uint32_t *flags_array,
uint32_t num_keys, DBT *keys,
uint32_t num_vals, DBT *vals
) /* update multiple DBs */;
uint32_t num_vals, DBT *vals) /* update multiple DBs */;
int (*get_redzone) (DB_ENV *env, int *redzone) /* get the redzone limit */;
int (*set_redzone) (DB_ENV *env, int redzone) /* set the redzone limit in percent of total space */;
int (*set_lk_max_memory) (DB_ENV *env, uint64_t max);
......
......@@ -295,22 +295,19 @@ struct __toku_db_env {
int (*create_loader) (DB_ENV *env, DB_TXN *txn, DB_LOADER **blp, DB *src_db, int N, DB *dbs[/*N*/], uint32_t db_flags[/*N*/], uint32_t dbt_flags[/*N*/], uint32_t loader_flags);
int (*create_indexer) (DB_ENV *env, DB_TXN *txn, DB_INDEXER **idxrp, DB *src_db, int N, DB *dbs[/*N*/], uint32_t db_flags[/*N*/], uint32_t indexer_flags);
int (*put_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array
) /* insert into multiple DBs */;
const DBT *src_key, const DBT *src_val,
uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array) /* insert into multiple DBs */;
int (*set_generate_row_callback_for_put) (DB_ENV *env, generate_row_for_put_func generate_row_for_put);
int (*del_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array
) /* delete from multiple DBs */;
const DBT *src_key, const DBT *src_val,
uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array) /* delete from multiple DBs */;
int (*set_generate_row_callback_for_del) (DB_ENV *env, generate_row_for_del_func generate_row_for_del);
int (*update_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
DBT *old_src_key, DBT *old_src_data,
DBT *new_src_key, DBT *new_src_data,
uint32_t num_dbs, DB **db_array, uint32_t *flags_array,
uint32_t num_keys, DBT *keys,
uint32_t num_vals, DBT *vals
) /* update multiple DBs */;
uint32_t num_vals, DBT *vals) /* update multiple DBs */;
int (*get_redzone) (DB_ENV *env, int *redzone) /* get the redzone limit */;
int (*set_redzone) (DB_ENV *env, int redzone) /* set the redzone limit in percent of total space */;
int (*set_lk_max_memory) (DB_ENV *env, uint64_t max);
......
......@@ -295,22 +295,19 @@ struct __toku_db_env {
int (*create_loader) (DB_ENV *env, DB_TXN *txn, DB_LOADER **blp, DB *src_db, int N, DB *dbs[/*N*/], uint32_t db_flags[/*N*/], uint32_t dbt_flags[/*N*/], uint32_t loader_flags);
int (*create_indexer) (DB_ENV *env, DB_TXN *txn, DB_INDEXER **idxrp, DB *src_db, int N, DB *dbs[/*N*/], uint32_t db_flags[/*N*/], uint32_t indexer_flags);
int (*put_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array
) /* insert into multiple DBs */;
const DBT *src_key, const DBT *src_val,
uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array) /* insert into multiple DBs */;
int (*set_generate_row_callback_for_put) (DB_ENV *env, generate_row_for_put_func generate_row_for_put);
int (*del_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array
) /* delete from multiple DBs */;
const DBT *src_key, const DBT *src_val,
uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array) /* delete from multiple DBs */;
int (*set_generate_row_callback_for_del) (DB_ENV *env, generate_row_for_del_func generate_row_for_del);
int (*update_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
DBT *old_src_key, DBT *old_src_data,
DBT *new_src_key, DBT *new_src_data,
uint32_t num_dbs, DB **db_array, uint32_t *flags_array,
uint32_t num_keys, DBT *keys,
uint32_t num_vals, DBT *vals
) /* update multiple DBs */;
uint32_t num_vals, DBT *vals) /* update multiple DBs */;
int (*get_redzone) (DB_ENV *env, int *redzone) /* get the redzone limit */;
int (*set_redzone) (DB_ENV *env, int redzone) /* set the redzone limit in percent of total space */;
int (*set_lk_max_memory) (DB_ENV *env, uint64_t max);
......
......@@ -295,22 +295,19 @@ struct __toku_db_env {
int (*create_indexer) (DB_ENV *env, DB_TXN *txn, DB_INDEXER **idxrp, DB *src_db, int N, DB *dbs[/*N*/], uint32_t db_flags[/*N*/], uint32_t indexer_flags);
void *app_private; /* 32-bit offset=52 size=4, 64=bit offset=104 size=8 */
int (*put_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array
) /* insert into multiple DBs */;
const DBT *src_key, const DBT *src_val,
uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array) /* insert into multiple DBs */;
int (*set_generate_row_callback_for_put) (DB_ENV *env, generate_row_for_put_func generate_row_for_put);
int (*del_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array
) /* delete from multiple DBs */;
const DBT *src_key, const DBT *src_val,
uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array) /* delete from multiple DBs */;
int (*set_generate_row_callback_for_del) (DB_ENV *env, generate_row_for_del_func generate_row_for_del);
int (*update_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
DBT *old_src_key, DBT *old_src_data,
DBT *new_src_key, DBT *new_src_data,
uint32_t num_dbs, DB **db_array, uint32_t *flags_array,
uint32_t num_keys, DBT *keys,
uint32_t num_vals, DBT *vals
) /* update multiple DBs */;
uint32_t num_vals, DBT *vals) /* update multiple DBs */;
int (*get_redzone) (DB_ENV *env, int *redzone) /* get the redzone limit */;
int (*set_redzone) (DB_ENV *env, int redzone) /* set the redzone limit in percent of total space */;
int (*set_lk_max_memory) (DB_ENV *env, uint64_t max);
......
......@@ -296,22 +296,19 @@ struct __toku_db_env {
int (*create_indexer) (DB_ENV *env, DB_TXN *txn, DB_INDEXER **idxrp, DB *src_db, int N, DB *dbs[/*N*/], uint32_t db_flags[/*N*/], uint32_t indexer_flags);
void *app_private; /* 32-bit offset=52 size=4, 64=bit offset=104 size=8 */
int (*put_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array
) /* insert into multiple DBs */;
const DBT *src_key, const DBT *src_val,
uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array) /* insert into multiple DBs */;
int (*set_generate_row_callback_for_put) (DB_ENV *env, generate_row_for_put_func generate_row_for_put);
int (*del_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array
) /* delete from multiple DBs */;
const DBT *src_key, const DBT *src_val,
uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array) /* delete from multiple DBs */;
int (*set_generate_row_callback_for_del) (DB_ENV *env, generate_row_for_del_func generate_row_for_del);
int (*update_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
DBT *old_src_key, DBT *old_src_data,
DBT *new_src_key, DBT *new_src_data,
uint32_t num_dbs, DB **db_array, uint32_t *flags_array,
uint32_t num_keys, DBT *keys,
uint32_t num_vals, DBT *vals
) /* update multiple DBs */;
uint32_t num_vals, DBT *vals) /* update multiple DBs */;
int (*get_redzone) (DB_ENV *env, int *redzone) /* get the redzone limit */;
int (*set_redzone) (DB_ENV *env, int redzone) /* set the redzone limit in percent of total space */;
int (*set_lk_max_memory) (DB_ENV *env, uint64_t max);
......
......@@ -597,22 +597,19 @@ int main (int argc __attribute__((__unused__)), char *const argv[] __attribute__
"int (*create_loader) (DB_ENV *env, DB_TXN *txn, DB_LOADER **blp, DB *src_db, int N, DB *dbs[/*N*/], uint32_t db_flags[/*N*/], uint32_t dbt_flags[/*N*/], uint32_t loader_flags)",
"int (*create_indexer) (DB_ENV *env, DB_TXN *txn, DB_INDEXER **idxrp, DB *src_db, int N, DB *dbs[/*N*/], uint32_t db_flags[/*N*/], uint32_t indexer_flags)",
"int (*put_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,\n"
" const DBT *key, const DBT *val,\n"
" uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array\n"
" ) /* insert into multiple DBs */",
" const DBT *src_key, const DBT *src_val,\n"
" uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array) /* insert into multiple DBs */",
"int (*set_generate_row_callback_for_put) (DB_ENV *env, generate_row_for_put_func generate_row_for_put)",
"int (*del_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,\n"
" const DBT *key, const DBT *val,\n"
" uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array\n"
" ) /* delete from multiple DBs */",
" const DBT *src_key, const DBT *src_val,\n"
" uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array) /* delete from multiple DBs */",
"int (*set_generate_row_callback_for_del) (DB_ENV *env, generate_row_for_del_func generate_row_for_del)",
"int (*update_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,\n"
" DBT *old_src_key, DBT *old_src_data,\n"
" DBT *new_src_key, DBT *new_src_data,\n"
" uint32_t num_dbs, DB **db_array, uint32_t *flags_array,\n"
" uint32_t num_keys, DBT *keys,\n"
" uint32_t num_vals, DBT *vals\n"
" ) /* update multiple DBs */",
" uint32_t num_vals, DBT *vals) /* update multiple DBs */",
"int (*get_redzone) (DB_ENV *env, int *redzone) /* get the redzone limit */",
"int (*set_redzone) (DB_ENV *env, int redzone) /* set the redzone limit in percent of total space */",
"int (*set_lk_max_memory) (DB_ENV *env, uint64_t max)",
......
......@@ -296,22 +296,19 @@ struct __toku_db_env {
int (*create_indexer) (DB_ENV *env, DB_TXN *txn, DB_INDEXER **idxrp, DB *src_db, int N, DB *dbs[/*N*/], uint32_t db_flags[/*N*/], uint32_t indexer_flags);
void *app_private;
int (*put_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array
) /* insert into multiple DBs */;
const DBT *src_key, const DBT *src_val,
uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array) /* insert into multiple DBs */;
int (*set_generate_row_callback_for_put) (DB_ENV *env, generate_row_for_put_func generate_row_for_put);
int (*del_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array
) /* delete from multiple DBs */;
const DBT *src_key, const DBT *src_val,
uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array) /* delete from multiple DBs */;
int (*set_generate_row_callback_for_del) (DB_ENV *env, generate_row_for_del_func generate_row_for_del);
int (*update_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
DBT *old_src_key, DBT *old_src_data,
DBT *new_src_key, DBT *new_src_data,
uint32_t num_dbs, DB **db_array, uint32_t *flags_array,
uint32_t num_keys, DBT *keys,
uint32_t num_vals, DBT *vals
) /* update multiple DBs */;
uint32_t num_vals, DBT *vals) /* update multiple DBs */;
int (*get_redzone) (DB_ENV *env, int *redzone) /* get the redzone limit */;
int (*set_redzone) (DB_ENV *env, int redzone) /* set the redzone limit in percent of total space */;
int (*set_lk_max_memory) (DB_ENV *env, uint64_t max);
......
......@@ -296,22 +296,19 @@ struct __toku_db_env {
int (*create_indexer) (DB_ENV *env, DB_TXN *txn, DB_INDEXER **idxrp, DB *src_db, int N, DB *dbs[/*N*/], uint32_t db_flags[/*N*/], uint32_t indexer_flags);
void *app_private;
int (*put_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array
) /* insert into multiple DBs */;
const DBT *src_key, const DBT *src_val,
uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array) /* insert into multiple DBs */;
int (*set_generate_row_callback_for_put) (DB_ENV *env, generate_row_for_put_func generate_row_for_put);
int (*del_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array
) /* delete from multiple DBs */;
const DBT *src_key, const DBT *src_val,
uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array) /* delete from multiple DBs */;
int (*set_generate_row_callback_for_del) (DB_ENV *env, generate_row_for_del_func generate_row_for_del);
int (*update_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
DBT *old_src_key, DBT *old_src_data,
DBT *new_src_key, DBT *new_src_data,
uint32_t num_dbs, DB **db_array, uint32_t *flags_array,
uint32_t num_keys, DBT *keys,
uint32_t num_vals, DBT *vals
) /* update multiple DBs */;
uint32_t num_vals, DBT *vals) /* update multiple DBs */;
int (*get_redzone) (DB_ENV *env, int *redzone) /* get the redzone limit */;
int (*set_redzone) (DB_ENV *env, int redzone) /* set the redzone limit in percent of total space */;
int (*set_lk_max_memory) (DB_ENV *env, uint64_t max);
......
......@@ -90,6 +90,7 @@ BDB_DONTRUN_TESTS = \
del-simple \
del-multiple \
del-multiple-huge-primary-row \
del-multiple-srcdb \
directory_lock \
diskfull \
env-put-multiple \
......@@ -194,9 +195,10 @@ BDB_DONTRUN_TESTS = \
test_txn_nested5 \
txn-ignore \
transactional_fileops \
update-multiple-nochange \
update-multiple-key0 \
update-multiple-data-diagonal \
update-multiple-key0 \
update-multiple-nochange \
update-multiple-with-indexer \
upgrade_simple \
upgrade-test-1 \
upgrade-test-2 \
......
#include "test.h"
// verify that del_multiple deletes the correct key from N dictionaries
// verify that del_multiple locks the correct key for N dictionaries
static int
get_key(int i, int dbnum) {
return htonl(i + dbnum);
}
static void
get_data(int *v, int i, int ndbs) {
for (int dbnum = 0; dbnum < ndbs; dbnum++) {
v[dbnum] = get_key(i, dbnum);
}
}
static int
del_callback(DB *dest_db, DB *src_db, DBT *dest_key, const DBT *src_key, const DBT *src_data) {
dest_db = dest_db; src_db = src_db; dest_key = dest_key; src_key = src_key; src_data = src_data;
unsigned int dbnum;
assert(dest_db->descriptor->dbt.size == sizeof dbnum);
memcpy(&dbnum, dest_db->descriptor->dbt.data, sizeof dbnum);
assert(dbnum < src_data->size / sizeof (int));
int *pri_data = (int *) src_data->data;
assert(dest_key->flags == 0);
dest_key->size = sizeof (int);
dest_key->data = &pri_data[dbnum];
return 0;
}
static void
verify_locked(DB_ENV *env, DB *db, int k) {
int r;
DB_TXN *txn = NULL;
r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
DBT key; dbt_init(&key, &k, sizeof k);
r = db->del(db, txn, &key, DB_DELETE_ANY); assert(r == DB_LOCK_NOTGRANTED);
r = txn->abort(txn); assert_zero(r);
}
static void
verify_empty(DB_ENV *env, DB *db) {
int r;
DB_TXN *txn = NULL;
r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
DBC *cursor = NULL;
r = db->cursor(db, txn, &cursor, 0); assert_zero(r);
int i;
for (i = 0; ; i++) {
DBT key; memset(&key, 0, sizeof key);
DBT val; memset(&val, 0, sizeof val);
r = cursor->c_get(cursor, &key, &val, DB_NEXT);
if (r != 0)
break;
}
assert_zero(i);
r = cursor->c_close(cursor); assert_zero(r);
r = txn->commit(txn, 0); assert_zero(r);
}
static void
verify_del_multiple(DB_ENV *env, DB *db[], int ndbs, int nrows) {
int r;
DB_TXN *deltxn = NULL;
r = env->txn_begin(env, NULL, &deltxn, 0); assert_zero(r);
for (int i = 0; i < nrows; i++) {
int k = get_key(i, 0);
DBT pri_key; dbt_init(&pri_key, &k, sizeof k);
int v[ndbs]; get_data(v, i, ndbs);
DBT pri_data; dbt_init(&pri_data, &v[0], sizeof v);
DBT keys[ndbs]; memset(keys, 0, sizeof keys);
uint32_t flags[ndbs]; memset(flags, 0, sizeof flags);
r = env->del_multiple(env, ndbs > 0 ? db[0] : NULL, deltxn, &pri_key, &pri_data, ndbs, db, keys, flags); assert_zero(r);
for (int dbnum = 0; dbnum < ndbs; dbnum++)
verify_locked(env, db[dbnum], get_key(i, dbnum));
}
r = deltxn->commit(deltxn, 0); assert_zero(r);
for (int dbnum = 0; dbnum < ndbs; dbnum++)
verify_empty(env, db[dbnum]);
}
static void
populate_primary(DB_ENV *env, DB *db, int ndbs, int nrows) {
int r;
DB_TXN *txn = NULL;
r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
// populate
for (int i = 0; i < nrows; i++) {
int k = get_key(i, 0);
int v[ndbs]; get_data(v, i, ndbs);
DBT key; dbt_init(&key, &k, sizeof k);
DBT val; dbt_init(&val, &v[0], sizeof v);
r = db->put(db, txn, &key, &val, DB_YESOVERWRITE); assert_zero(r);
}
r = txn->commit(txn, 0); assert_zero(r);
}
static void
populate_secondary(DB_ENV *env, DB *db, int dbnum, int nrows) {
int r;
DB_TXN *txn = NULL;
r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
// populate
for (int i = 0; i < nrows; i++) {
int k = get_key(i, dbnum);
DBT key; dbt_init(&key, &k, sizeof k);
DBT val; dbt_init(&val, NULL, 0);
r = db->put(db, txn, &key, &val, DB_YESOVERWRITE); assert_zero(r);
}
r = txn->commit(txn, 0); assert_zero(r);
}
static void
run_test(int ndbs, int nrows) {
int r;
DB_ENV *env = NULL;
r = db_env_create(&env, 0); assert_zero(r);
r = env->set_generate_row_callback_for_del(env, del_callback); assert_zero(r);
r = env->open(env, ENVDIR, DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
DB *db[ndbs];
for (int dbnum = 0; dbnum < ndbs; dbnum++) {
r = db_create(&db[dbnum], env, 0); assert_zero(r);
DBT dbt_dbnum; dbt_init(&dbt_dbnum, &dbnum, sizeof dbnum);
r = db[dbnum]->set_descriptor(db[dbnum], 1, &dbt_dbnum); assert_zero(r);
char dbname[32]; sprintf(dbname, "%d.tdb", dbnum);
r = db[dbnum]->open(db[dbnum], NULL, dbname, NULL, DB_BTREE, DB_AUTO_COMMIT+DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
}
for (int dbnum = 0; dbnum < ndbs; dbnum++) {
if (dbnum == 0)
populate_primary(env, db[dbnum], ndbs, nrows);
else
populate_secondary(env, db[dbnum], dbnum, nrows);
}
verify_del_multiple(env, db, ndbs, nrows);
for (int dbnum = 0; dbnum < ndbs; dbnum++)
r = db[dbnum]->close(db[dbnum], 0); assert_zero(r);
r = env->close(env, 0); assert_zero(r);
}
int
test_main(int argc, char * const argv[]) {
int r;
int ndbs = 2;
int nrows = 2;
// parse_args(argc, argv);
for (int i = 1; i < argc; i++) {
char * const arg = argv[i];
if (strcmp(arg, "-v") == 0) {
verbose++;
continue;
}
if (strcmp(arg, "-q") == 0) {
verbose = 0;
continue;
}
if (strcmp(arg, "--ndbs") == 0 && i+1 < argc) {
ndbs = atoi(argv[++i]);
continue;
}
if (strcmp(arg, "--nrows") == 0 && i+1 < argc) {
nrows = atoi(argv[++i]);
continue;
}
}
r = system("rm -rf " ENVDIR); assert_zero(r);
r = toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
run_test(ndbs, nrows);
return 0;
}
......@@ -32,7 +32,6 @@ get_new_data(int *v, int i, int ndbs) {
static int
put_callback(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_data, const DBT *src_key, const DBT *src_data) {
dest_db = dest_db; src_db = src_db; dest_key = dest_key; dest_data = dest_data; src_key = src_key; src_data = src_data;
assert(src_db == NULL);
unsigned int dbnum;
assert(dest_db->descriptor->dbt.size == sizeof dbnum);
......@@ -182,7 +181,7 @@ update_diagonal(DB_ENV *env, DB *db[], int ndbs, int nrows) {
DBT vals[ndbts]; memset(vals, 0, sizeof vals);
uint32_t flags_array[ndbs]; memset(flags_array, 0, sizeof(flags_array));
r = env->update_multiple(env, NULL, txn, &old_key, &old_data, &new_key, &new_data, ndbs, db, flags_array, ndbts, keys, ndbts, vals);
r = env->update_multiple(env, ndbs > 0 ? db[0] : NULL, txn, &old_key, &old_data, &new_key, &new_data, ndbs, db, flags_array, ndbts, keys, ndbts, vals);
assert_zero(r);
}
r = txn->commit(txn, 0); assert_zero(r);
......
......@@ -17,7 +17,6 @@ get_data(int *v, int i, int ndbs) {
static int
put_callback(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_data, const DBT *src_key, const DBT *src_data) {
dest_db = dest_db; src_db = src_db; dest_key = dest_key; dest_data = dest_data; src_key = src_key; src_data = src_data;
assert(src_db == NULL);
unsigned int dbnum;
assert(dest_db->descriptor->dbt.size == sizeof dbnum);
......@@ -161,7 +160,7 @@ update_key0(DB_ENV *env, DB *db[], int ndbs, int nrows) {
DBT vals[ndbts]; memset(vals, 0, sizeof vals);
uint32_t flags_array[ndbs]; memset(flags_array, 0, sizeof(flags_array));
r = env->update_multiple(env, NULL, txn, &old_key, &old_data, &new_key, &new_data, ndbs, db, flags_array, ndbts, keys, ndbts, vals);
r = env->update_multiple(env, ndbs > 0 ? db[0] : NULL, txn, &old_key, &old_data, &new_key, &new_data, ndbs, db, flags_array, ndbts, keys, ndbts, vals);
assert_zero(r);
verify_locked(env, db[0], k);
......
......@@ -17,7 +17,6 @@ get_data(int *v, int i, int ndbs) {
static int
put_callback(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_data, const DBT *src_key, const DBT *src_data) {
dest_db = dest_db; src_db = src_db; dest_key = dest_key; dest_data = dest_data; src_key = src_key; src_data = src_data;
assert(src_db == NULL);
unsigned int dbnum;
assert(dest_db->descriptor->dbt.size == sizeof dbnum);
......@@ -156,7 +155,7 @@ verify(DB_ENV *env, DB *db[], int ndbs, int nrows) {
DBT vals[ndbts]; memset(vals, 0, sizeof vals);
uint32_t flags_array[ndbs]; memset(flags_array, 0, sizeof(flags_array));
r = env->update_multiple(env, NULL, txn, &old_key, &old_data, &new_key, &new_data, ndbs, db, flags_array, ndbts, keys, ndbts, vals);
r = env->update_multiple(env, ndbs > 0 ? db[0] : NULL, txn, &old_key, &old_data, &new_key, &new_data, ndbs, db, flags_array, ndbts, keys, ndbts, vals);
assert_zero(r);
}
r = txn->commit(txn, 0); assert_zero(r);
......
#include "test.h"
// verify that update_multiple where we change the data in row[i] col[j] from x to x+1
static int
get_key(int i, int dbnum) {
return htonl(2*(i + dbnum));
}
static int
get_new_key(int i, int dbnum) {
return htonl(2*(i + dbnum) + 1);
}
static void
get_data(int *v, int i, int ndbs) {
for (int dbnum = 0; dbnum < ndbs; dbnum++) {
v[dbnum] = get_key(i, dbnum);
}
}
static void
get_new_data(int *v, int i, int ndbs) {
for (int dbnum = 0; dbnum < ndbs; dbnum++) {
if ((i % ndbs) == dbnum)
v[dbnum] = get_new_key(i, dbnum);
else
v[dbnum] = get_key(i, dbnum);
}
}
static int
put_callback(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_data, const DBT *src_key, const DBT *src_data) {
dest_db = dest_db; src_db = src_db; dest_key = dest_key; dest_data = dest_data; src_key = src_key; src_data = src_data;
unsigned int dbnum;
assert(dest_db->descriptor->dbt.size == sizeof dbnum);
memcpy(&dbnum, dest_db->descriptor->dbt.data, sizeof dbnum);
assert(dbnum < src_data->size / sizeof (int));
int *pri_key = (int *) src_key->data;
int *pri_data = (int *) src_data->data;
switch (dest_key->flags) {
case 0:
dest_key->size = sizeof (int);
dest_key->data = dbnum == 0 ? &pri_key[dbnum] : &pri_data[dbnum];
break;
case DB_DBT_REALLOC:
dest_key->size = sizeof (int);
dest_key->data = toku_realloc(dest_key->data, dest_key->size);
memcpy(dest_key->data, dbnum == 0 ? &pri_key[dbnum] : &pri_data[dbnum], dest_key->size);
break;
default:
assert(0);
}
if (dest_data) {
switch (dest_data->flags) {
case 0:
if (dbnum == 0) {
dest_data->size = src_data->size;
dest_data->data = src_data->data;
} else
dest_data->size = 0;
break;
case DB_DBT_REALLOC:
if (dbnum == 0) {
dest_data->size = src_data->size;
dest_data->data = toku_realloc(dest_data->data, dest_data->size);
memcpy(dest_data->data, src_data->data, dest_data->size);
} else
dest_data->size = 0;
break;
default:
assert(0);
}
}
return 0;
}
static int
del_callback(DB *dest_db, DB *src_db, DBT *dest_key, const DBT *src_key, const DBT *src_data) {
return put_callback(dest_db, src_db, dest_key, NULL, src_key, src_data);
}
#if 0
static void
verify_locked(DB_ENV *env, DB *db, int k) {
int r;
DB_TXN *txn = NULL;
r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
DBT key; dbt_init(&key, &k, sizeof k);
r = db->del(db, txn, &key, DB_DELETE_ANY); assert(r == DB_LOCK_NOTGRANTED);
r = txn->abort(txn); assert_zero(r);
}
static void
verify_empty(DB_ENV *env, DB *db) {
int r;
DB_TXN *txn = NULL;
r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
DBC *cursor = NULL;
r = db->cursor(db, txn, &cursor, 0); assert_zero(r);
int i;
for (i = 0; ; i++) {
DBT key; memset(&key, 0, sizeof key);
DBT val; memset(&val, 0, sizeof val);
r = cursor->c_get(cursor, &key, &val, DB_NEXT);
if (r != 0)
break;
}
assert_zero(i);
r = cursor->c_close(cursor); assert_zero(r);
r = txn->commit(txn, 0); assert_zero(r);
}
#endif
static void
verify_seq(DB_ENV *env, DB *db, int dbnum, int ndbs, int nrows) {
int r;
DB_TXN *txn = NULL;
r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
DBC *cursor = NULL;
r = db->cursor(db, txn, &cursor, 0); assert_zero(r);
int i;
for (i = 0; ; i++) {
DBT key; memset(&key, 0, sizeof key);
DBT val; memset(&val, 0, sizeof val);
r = cursor->c_get(cursor, &key, &val, DB_NEXT);
if (r != 0)
break;
int k;
int expectk;
if (dbnum == 0 || (i % ndbs) != dbnum)
expectk = get_key(i, dbnum);
else
expectk = get_new_key(i, dbnum);
assert(key.size == sizeof k);
memcpy(&k, key.data, key.size);
assert(k == expectk);
if (dbnum == 0) {
assert(val.size == ndbs * sizeof (int));
int v[ndbs]; get_new_data(v, i, ndbs);
assert(memcmp(val.data, v, val.size) == 0);
} else
assert(val.size == 0);
}
assert(i == nrows); // if (i != nrows) printf("%s:%d %d %d\n", __FUNCTION__, __LINE__, i, nrows); // assert(i == nrows);
r = cursor->c_close(cursor); assert_zero(r);
r = txn->commit(txn, 0); assert_zero(r);
}
static void
update_diagonal(DB_ENV *env, DB *db[], int ndbs, int nrows) {
assert(ndbs > 0);
int r;
DB_TXN *txn = NULL;
r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
for (int i = 0; i < nrows; i++) {
// update the data i % ndbs col from x to x+1
int k = get_key(i, 0);
DBT old_key; dbt_init(&old_key, &k, sizeof k);
DBT new_key = old_key;
int v[ndbs]; get_data(v, i, ndbs);
DBT old_data; dbt_init(&old_data, &v[0], sizeof v);
int newv[ndbs]; get_new_data(newv, i, ndbs);
DBT new_data; dbt_init(&new_data, &newv[0], sizeof newv);
int ndbts = 2 * ndbs;
DBT keys[ndbts]; memset(keys, 0, sizeof keys);
DBT vals[ndbts]; memset(vals, 0, sizeof vals);
uint32_t flags_array[ndbs]; memset(flags_array, 0, sizeof(flags_array));
r = env->update_multiple(env, ndbs > 0 ? db[0] : NULL, txn, &old_key, &old_data, &new_key, &new_data, ndbs, db, flags_array, ndbts, keys, ndbts, vals);
assert_zero(r);
}
r = txn->commit(txn, 0); assert_zero(r);
}
static void
populate_primary(DB_ENV *env, DB *db, int ndbs, int nrows) {
int r;
DB_TXN *txn = NULL;
r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
// populate
for (int i = 0; i < nrows; i++) {
int k = get_key(i, 0);
int v[ndbs]; get_data(v, i, ndbs);
DBT key; dbt_init(&key, &k, sizeof k);
DBT val; dbt_init(&val, &v[0], sizeof v);
r = db->put(db, txn, &key, &val, DB_YESOVERWRITE); assert_zero(r);
}
r = txn->commit(txn, 0); assert_zero(r);
}
static void
populate_secondary(DB_ENV *env, DB *db, int dbnum, int nrows) {
int r;
DB_TXN *txn = NULL;
r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
// populate
for (int i = 0; i < nrows; i++) {
int k = get_key(i, dbnum);
DBT key; dbt_init(&key, &k, sizeof k);
DBT val; dbt_init(&val, NULL, 0);
r = db->put(db, txn, &key, &val, DB_YESOVERWRITE); assert_zero(r);
}
r = txn->commit(txn, 0); assert_zero(r);
}
static void
run_test(int ndbs, int nrows) {
int r;
DB_ENV *env = NULL;
r = db_env_create(&env, 0); assert_zero(r);
r = env->set_generate_row_callback_for_put(env, put_callback); assert_zero(r);
r = env->set_generate_row_callback_for_del(env, del_callback); assert_zero(r);
r = env->open(env, ENVDIR, DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
DB *db[ndbs];
for (int dbnum = 0; dbnum < ndbs; dbnum++) {
r = db_create(&db[dbnum], env, 0); assert_zero(r);
DBT dbt_dbnum; dbt_init(&dbt_dbnum, &dbnum, sizeof dbnum);
r = db[dbnum]->set_descriptor(db[dbnum], 1, &dbt_dbnum); assert_zero(r);
char dbname[32]; sprintf(dbname, "%d.tdb", dbnum);
r = db[dbnum]->open(db[dbnum], NULL, dbname, NULL, DB_BTREE, DB_AUTO_COMMIT+DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
}
for (int dbnum = 0; dbnum < ndbs-1; dbnum++) {
if (dbnum == 0)
populate_primary(env, db[dbnum], ndbs, nrows);
else
populate_secondary(env, db[dbnum], dbnum, nrows);
}
DB_TXN *indexer_txn = NULL;
r = env->txn_begin(env, NULL, &indexer_txn, 0); assert_zero(r);
DB_INDEXER *indexer = NULL;
uint32_t db_flags = 0;
r = env->create_indexer(env, indexer_txn, &indexer, db[0], 1, &db[ndbs-1], &db_flags, 0); assert_zero(r);
update_diagonal(env, db, ndbs, nrows);
r = indexer->build(indexer); assert_zero(r);
r = indexer->close(indexer); assert_zero(r);
r = indexer_txn->commit(indexer_txn, 0); assert_zero(r);
for (int dbnum = 0; dbnum < ndbs; dbnum++)
verify_seq(env, db[dbnum], dbnum, ndbs, nrows);
for (int dbnum = 0; dbnum < ndbs; dbnum++)
r = db[dbnum]->close(db[dbnum], 0); assert_zero(r);
r = env->close(env, 0); assert_zero(r);
}
int
test_main(int argc, char * const argv[]) {
int r;
int ndbs = 2;
int nrows = 2;
// parse_args(argc, argv);
for (int i = 1; i < argc; i++) {
char * const arg = argv[i];
if (strcmp(arg, "-v") == 0) {
verbose++;
continue;
}
if (strcmp(arg, "-q") == 0) {
verbose = 0;
continue;
}
if (strcmp(arg, "--ndbs") == 0 && i+1 < argc) {
ndbs = atoi(argv[++i]);
continue;
}
if (strcmp(arg, "--nrows") == 0 && i+1 < argc) {
nrows = atoi(argv[++i]);
continue;
}
}
r = system("rm -rf " ENVDIR); assert_zero(r);
r = toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
run_test(ndbs, nrows);
return 0;
}
......@@ -1570,11 +1570,11 @@ locked_env_set_generate_row_callback_for_del(DB_ENV *env, generate_row_for_del_f
}
static int env_put_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
const DBT *src_key, const DBT *src_val,
uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array);
static int env_del_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
const DBT *src_key, const DBT *src_val,
uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array);
static int env_update_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn,
......@@ -1585,20 +1585,20 @@ static int env_update_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn,
uint32_t num_vals, DBT *vals);
static int
locked_env_put_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn, const DBT *key, const DBT *val, uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array) {
locked_env_put_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn, const DBT *src_key, const DBT *src_val, uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array) {
int r = env_check_avail_fs_space(env);
if (r == 0) {
toku_ydb_lock();
r = env_put_multiple(env, src_db, txn, key, val, num_dbs, db_array, keys, vals, flags_array);
r = env_put_multiple(env, src_db, txn, src_key, src_val, num_dbs, db_array, keys, vals, flags_array);
toku_ydb_unlock();
}
return r;
}
static int
locked_env_del_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn, const DBT *key, const DBT *val, uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array) {
locked_env_del_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn, const DBT *src_key, const DBT *src_val, uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array) {
toku_ydb_lock();
int r = env_del_multiple(env, src_db, txn, key, val, num_dbs, db_array, keys, flags_array);
int r = env_del_multiple(env, src_db, txn, src_key, src_val, num_dbs, db_array, keys, flags_array);
toku_ydb_unlock();
return r;
}
......@@ -3843,21 +3843,29 @@ lookup_src_db(uint32_t num_dbs, DB *db_array[], DB *src_db) {
}
static int
do_del_multiple(DB_TXN *txn, uint32_t num_dbs, DB *db_array[], DBT keys[]) {
do_del_multiple(DB_TXN *txn, uint32_t num_dbs, DB *db_array[], DBT keys[], DB *src_db, const DBT *src_key) {
src_db = src_db; src_key = src_key;
int r = 0;
TOKUTXN ttxn = db_txn_struct_i(txn)->tokutxn;
for (uint32_t which_db = 0; r == 0 && which_db < num_dbs; which_db++) {
DB *db = db_array[which_db];
// if db is being indexed by an indexer, then insert a delete message into the db if the src key is to the left or equal to the
// indexers cursor. we have to get the src_db from the indexer and find it in the db_array.
int do_delete = TRUE;
DB_INDEXER *indexer = toku_db_get_indexer(db);
if (indexer) { // if this db is the index under construction
DB *src_db = toku_indexer_get_src_db(indexer);
invariant(src_db != NULL);
uint32_t which_src_db = lookup_src_db(num_dbs, db_array, src_db);
if (which_src_db >= num_dbs)
r = EINVAL;
else
do_delete = !toku_indexer_is_key_right_of_le_cursor(indexer, src_db, &keys[which_src_db]);
DB *indexer_src_db = toku_indexer_get_src_db(indexer);
invariant(indexer_src_db != NULL);
const DBT *indexer_src_key;
if (src_db == indexer_src_db)
indexer_src_key = src_key;
else {
uint32_t which_src_db = lookup_src_db(num_dbs, db_array, indexer_src_db);
invariant(which_src_db < num_dbs);
indexer_src_key = &keys[which_src_db];
}
do_delete = !toku_indexer_is_key_right_of_le_cursor(indexer, indexer_src_db, indexer_src_key);
}
if (r == 0 && do_delete) {
r = toku_brt_maybe_delete(db->i->brt, &keys[which_db], ttxn, FALSE, ZERO_LSN, FALSE);
......@@ -3871,8 +3879,8 @@ env_del_multiple(
DB_ENV *env,
DB *src_db,
DB_TXN *txn,
const DBT *key,
const DBT *val,
const DBT *src_key,
const DBT *src_val,
uint32_t num_dbs,
DB **db_array,
DBT *keys,
......@@ -3880,14 +3888,6 @@ env_del_multiple(
{
int r;
DBT del_keys[num_dbs];
BOOL multi_accounting = TRUE; // use num_multi_delete accountability counters
// special case single DB
if (num_dbs == 1 && src_db == db_array[0]) {
multi_accounting = FALSE;
r = toku_db_del(db_array[0], txn, (DBT *) key, flags_array[0]);
goto cleanup;
}
HANDLE_PANICKED_ENV(env);
......@@ -3911,11 +3911,11 @@ env_del_multiple(
DB *db = db_array[which_db];
if (db == src_db) {
del_keys[which_db] = *key;
del_keys[which_db] = *src_key;
}
else {
//Generate the key
r = env->i->generate_row_for_del(db, src_db, &keys[which_db], key, val);
r = env->i->generate_row_for_del(db, src_db, &keys[which_db], src_key, src_val);
if (r != 0) goto cleanup;
del_keys[which_db] = keys[which_db];
}
......@@ -3948,19 +3948,17 @@ env_del_multiple(
if (num_dbs == 1)
r = log_del_single(txn, brts[0], &del_keys[0]);
else
r = log_del_multiple(txn, src_db, key, val, num_dbs, brts, del_keys);
r = log_del_multiple(txn, src_db, src_key, src_val, num_dbs, brts, del_keys);
if (r == 0)
r = do_del_multiple(txn, num_dbs, db_array, del_keys);
r = do_del_multiple(txn, num_dbs, db_array, del_keys, src_db, src_key);
}
cleanup:
if (multi_accounting) {
if (r == 0)
num_multi_deletes += num_dbs;
else
num_multi_deletes_fail += num_dbs;
}
if (r == 0)
num_multi_deletes += num_dbs;
else
num_multi_deletes_fail += num_dbs;
return r;
}
......@@ -4549,21 +4547,28 @@ log_put_multiple(DB_TXN *txn, DB *src_db, const DBT *src_key, const DBT *src_val
}
static int
do_put_multiple(DB_TXN *txn, uint32_t num_dbs, DB *db_array[], DBT keys[], DBT vals[]) {
do_put_multiple(DB_TXN *txn, uint32_t num_dbs, DB *db_array[], DBT keys[], DBT vals[], DB *src_db, const DBT *src_key) {
int r = 0;
TOKUTXN ttxn = db_txn_struct_i(txn)->tokutxn;
for (uint32_t which_db = 0; r == 0 && which_db < num_dbs; which_db++) {
DB *db = db_array[which_db];
// if db is being indexed by an indexer, then put into that db if the src key is to the left or equal to the
// indexers cursor. we have to get the src_db from the indexer and find it in the db_array.
int do_put = TRUE;
DB_INDEXER *indexer = toku_db_get_indexer(db);
if (indexer) { // if this db is the index under construction
DB *src_db = toku_indexer_get_src_db(indexer);
invariant(src_db != NULL);
uint32_t which_src_db = lookup_src_db(num_dbs, db_array, src_db);
if (which_src_db >= num_dbs)
r = EINVAL;
else
do_put = !toku_indexer_is_key_right_of_le_cursor(indexer, src_db, &keys[which_src_db]);
DB *indexer_src_db = toku_indexer_get_src_db(indexer);
invariant(indexer_src_db != NULL);
const DBT *indexer_src_key;
if (src_db == indexer_src_db)
indexer_src_key = src_key;
else {
uint32_t which_src_db = lookup_src_db(num_dbs, db_array, indexer_src_db);
invariant(which_src_db < num_dbs);
indexer_src_key = &keys[which_src_db];
}
do_put = !toku_indexer_is_key_right_of_le_cursor(indexer, src_db, indexer_src_key);
}
if (r == 0 && do_put) {
r = toku_brt_maybe_insert(db->i->brt, &keys[which_db], &vals[which_db], ttxn, FALSE, ZERO_LSN, FALSE, BRT_INSERT);
......@@ -4577,8 +4582,8 @@ env_put_multiple(
DB_ENV *env,
DB *src_db,
DB_TXN *txn,
const DBT *key,
const DBT *val,
const DBT *src_key,
const DBT *src_val,
uint32_t num_dbs,
DB **db_array,
DBT *keys,
......@@ -4588,14 +4593,6 @@ env_put_multiple(
int r;
DBT put_keys[num_dbs];
DBT put_vals[num_dbs];
BOOL multi_accounting = TRUE; // use num_multi_insert accountability counters
// special case for a single DB
if (num_dbs == 1 && src_db == db_array[0]) {
multi_accounting = FALSE;
r = toku_db_put(src_db, txn, (DBT *) key, (DBT *) val, flags_array[0]);
goto cleanup;
}
HANDLE_PANICKED_ENV(env);
......@@ -4620,11 +4617,11 @@ env_put_multiple(
//Generate the row
if (db == src_db) {
put_keys[which_db] = *key;
put_vals[which_db] = *val;
put_keys[which_db] = *src_key;
put_vals[which_db] = *src_val;
}
else {
r = env->i->generate_row_for_put(db, src_db, &keys[which_db], &vals[which_db], key, val);
r = env->i->generate_row_for_put(db, src_db, &keys[which_db], &vals[which_db], src_key, src_val);
if (r != 0) goto cleanup;
put_keys[which_db] = keys[which_db];
put_vals[which_db] = vals[which_db];
......@@ -4663,77 +4660,18 @@ env_put_multiple(
if (num_dbs == 1)
r = log_put_single(txn, brts[0], &put_keys[0], &put_vals[0]);
else
r = log_put_multiple(txn, src_db, key, val, num_dbs, brts);
r = log_put_multiple(txn, src_db, src_key, src_val, num_dbs, brts);
if (r == 0)
r = do_put_multiple(txn, num_dbs, db_array, put_keys, put_vals);
r = do_put_multiple(txn, num_dbs, db_array, put_keys, put_vals, src_db, src_key);
}
cleanup:
if (multi_accounting) {
if (r == 0)
num_multi_inserts += num_dbs;
else
num_multi_inserts_fail += num_dbs;
}
return r;
}
static int
dbt_cmp(const DBT *a, const DBT *b) {
if (a->size < b->size)
return -1;
if (a->size > b->size)
return +1;
return memcmp(a->data, b->data, a->size);
}
static int
update_single(
DB_ENV *env,
DB *db,
uint32_t flags,
DB_TXN *txn,
DBT *old_key,
DBT *old_data,
DBT *new_key,
DBT *new_data)
{
int r = 0;
uint32_t lock_flags;
uint32_t remaining_flags;
lock_flags = get_prelocked_flags(flags);
remaining_flags = flags & ~lock_flags;
r = toku_grab_read_lock_on_directory(db, txn);
if (r != 0) goto cleanup;
int (*cmpfun)(DB *db, const DBT *a, const DBT *b) = toku_builtin_compare_fun;
if (env->i->bt_compare)
cmpfun = env->i->bt_compare;
BOOL key_eq = cmpfun(db, old_key, new_key) == 0;
if (!key_eq) {
//Check overwrite constraints only in the case where
// the keys are not equal.
// If the keys are equal, then we do not care of the flag is DB_NOOVERWRITE or DB_YESOVERWRITE
r = db_put_check_overwrite_constraint(db, txn,
new_key,
lock_flags, remaining_flags);
if (r != 0) goto cleanup;
r = toku_db_del(db, txn, (DBT *) old_key, DB_DELETE_ANY);
}
if (r == 0 && (!key_eq || !(dbt_cmp(old_data, new_data) == 0))) {
r = toku_db_put(db, txn, (DBT *) new_key, (DBT *) new_data, DB_YESOVERWRITE);
}
cleanup:
if (r == 0)
num_updates++;
num_multi_inserts += num_dbs;
else
num_updates_fail++;
num_multi_inserts_fail += num_dbs;
return r;
}
......@@ -4745,22 +4683,6 @@ env_update_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn,
uint32_t num_keys, DBT keys[],
uint32_t num_vals, DBT vals[]) {
int r = 0;
BOOL multi_accounting = TRUE; // use num_multi_update accountability counters
// special case for a single DB
if (num_dbs == 1 && src_db == db_array[0]) {
multi_accounting = FALSE;
r = update_single(env,
db_array[0],
flags_array[0],
txn,
old_src_key,
old_src_data,
new_src_key,
new_src_data
);
goto cleanup;
}
HANDLE_PANICKED_ENV(env);
......@@ -4790,15 +4712,10 @@ env_update_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn,
uint32_t lock_flags[num_dbs];
uint32_t remaining_flags[num_dbs];
int (*cmpfun)(DB *db, const DBT *a, const DBT *b) = toku_builtin_compare_fun;
if (env->i->bt_compare)
cmpfun = env->i->bt_compare;
for (uint32_t which_db = 0; which_db < num_dbs; which_db++) {
DB *db = db_array[which_db];
DBT curr_old_key, curr_new_key, curr_new_val;
lock_flags[which_db] = get_prelocked_flags(flags_array[which_db]);
remaining_flags[which_db] = flags_array[which_db] & ~lock_flags[which_db];
......@@ -4832,6 +4749,7 @@ env_update_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn,
curr_new_key = keys[which_db];
curr_new_val = vals[which_db];
}
toku_dbt_cmp cmpfun = toku_db_get_compare_fun(db);
BOOL key_eq = cmpfun(db, &curr_old_key, &curr_new_key) == 0;
if (!key_eq) {
r = toku_grab_read_lock_on_directory(db, txn);
......@@ -4890,8 +4808,8 @@ env_update_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn,
r = log_del_single(txn, del_brts[0], &del_keys[0]);
else
r = log_del_multiple(txn, src_db, old_src_key, old_src_data, n_del_dbs, del_brts, del_keys);
if (r == 0)
r = do_del_multiple(txn, n_del_dbs, del_dbs, del_keys);
if (r == 0)
r = do_del_multiple(txn, n_del_dbs, del_dbs, del_keys, src_db, old_src_key);
}
if (r == 0 && n_put_dbs > 0) {
......@@ -4900,17 +4818,15 @@ env_update_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn,
else
r = log_put_multiple(txn, src_db, new_src_key, new_src_data, n_put_dbs, put_brts);
if (r == 0)
r = do_put_multiple(txn, n_put_dbs, put_dbs, put_keys, put_vals);
r = do_put_multiple(txn, n_put_dbs, put_dbs, put_keys, put_vals, src_db, new_src_key);
}
}
cleanup:
if (multi_accounting) {
if (r == 0)
num_multi_updates += num_dbs;
else
num_multi_updates_fail += num_dbs;
}
if (r == 0)
num_multi_updates += num_dbs;
else
num_multi_updates_fail += num_dbs;
return r;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment