Commit f852b1c2 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

#3129 fix recover of insert multiple and delete multiple with a valid source filenum closes[t:3129]

git-svn-id: file:///svn/toku/tokudb@26501 c7de825b-a66e-492c-adef-691d508d4ae1
parent c373e32b
......@@ -829,26 +829,26 @@ static int toku_recover_enq_insert_multiple (struct logtype_enq_insert_multiple
assert(r == 0);
assert(txn!=NULL);
DB *src_db = NULL;
BOOL do_inserts = TRUE;
{
struct file_map_tuple *tuple = NULL;
r = file_map_find(&renv->fmap, l->src_filenum, &tuple);
if (l->src_filenum.fileid == FILENUM_NONE.fileid)
assert(r==DB_NOTFOUND);
else {
assert(r==0); //How do we continue if src_db is specified but missing?
if (r == 0)
src_db = tuple->brt->db;
else
do_inserts = FALSE; // src file was probably deleted
}
}
uint32_t file;
DBT src_key, src_val, dest_key, dest_val;
toku_fill_dbt(&src_key, l->src_key.data, l->src_key.len);
toku_fill_dbt(&src_val, l->src_val.data, l->src_val.len);
toku_init_dbt(&dest_key);
toku_init_dbt(&dest_val);
dest_key.flags = DB_DBT_REALLOC;
dest_val.flags = DB_DBT_REALLOC;
toku_init_dbt_flags(&dest_key, DB_DBT_REALLOC);
toku_init_dbt_flags(&dest_val, DB_DBT_REALLOC);
for (file = 0; file < l->dest_filenums.num; file++) {
for (uint32_t file = 0; do_inserts && file < l->dest_filenums.num; file++) {
struct file_map_tuple *tuple = NULL;
r = file_map_find(&renv->fmap, l->dest_filenums.filenums[file], &tuple);
if (r==0) {
......@@ -858,18 +858,15 @@ static int toku_recover_enq_insert_multiple (struct logtype_enq_insert_multiple
assert(r==0);
r = toku_brt_maybe_insert(tuple->brt, &dest_key, &dest_val, txn, TRUE, l->lsn, FALSE, BRT_INSERT);
assert(r == 0);
//flags==0 means generate_row_for_put callback changed it
//(and freed any memory necessary to do so) so that values are now stored
//in temporary memory that does not need to be freed. We need to continue
//using DB_DBT_REALLOC however.
if (dest_key.flags == 0) {
toku_init_dbt(&dest_key);
dest_key.flags = DB_DBT_REALLOC;
}
if (dest_val.flags == 0) {
toku_init_dbt(&dest_val);
dest_val.flags = DB_DBT_REALLOC;
}
if (dest_key.flags == 0)
toku_init_dbt_flags(&dest_key, DB_DBT_REALLOC);
if (dest_val.flags == 0)
toku_init_dbt_flags(&dest_val, DB_DBT_REALLOC);
}
}
if (dest_key.data) toku_free(dest_key.data); //TODO: #2321 May need windows hack
......@@ -890,24 +887,26 @@ static int toku_recover_enq_delete_multiple (struct logtype_enq_delete_multiple
assert(r == 0);
assert(txn!=NULL);
DB *src_db = NULL;
BOOL do_deletes = TRUE;
{
struct file_map_tuple *tuple = NULL;
r = file_map_find(&renv->fmap, l->src_filenum, &tuple);
if (l->src_filenum.fileid == FILENUM_NONE.fileid)
assert(r==DB_NOTFOUND);
else {
assert(r==0); //How do we continue if src_db is specified but missing?
if (r == 0)
src_db = tuple->brt->db;
else
do_deletes = FALSE; // src file was probably deleted
}
}
uint32_t file;
DBT src_key, src_val, dest_key;
toku_fill_dbt(&src_key, l->src_key.data, l->src_key.len);
toku_fill_dbt(&src_val, l->src_val.data, l->src_val.len);
toku_init_dbt(&dest_key);
dest_key.flags = DB_DBT_REALLOC;
toku_init_dbt_flags(&dest_key, DB_DBT_REALLOC);
for (file = 0; file < l->dest_filenums.num; file++) {
for (uint32_t file = 0; do_deletes && file < l->dest_filenums.num; file++) {
struct file_map_tuple *tuple = NULL;
r = file_map_find(&renv->fmap, l->dest_filenums.filenums[file], &tuple);
if (r==0) {
......@@ -917,12 +916,11 @@ static int toku_recover_enq_delete_multiple (struct logtype_enq_delete_multiple
assert(r==0);
r = toku_brt_maybe_delete(tuple->brt, &dest_key, txn, TRUE, l->lsn, FALSE);
assert(r == 0);
//flags==0 indicates the return values are stored in temporary memory that does
//not need to be freed. We need to continue using DB_DBT_REALLOC however.
if (dest_key.flags == 0) {
toku_init_dbt(&dest_key);
dest_key.flags = DB_DBT_REALLOC;
}
if (dest_key.flags == 0)
toku_init_dbt_flags(&dest_key, DB_DBT_REALLOC);
}
}
if (dest_key.flags & DB_DBT_REALLOC && dest_key.data) toku_free(dest_key.data); //TODO: #2321 May need windows hack
......
......@@ -144,6 +144,7 @@ BDB_DONTRUN_TESTS = \
recover-compare-db-descriptor \
recover-del-multiple \
recover-del-multiple-abort \
recover-del-multiple-srcdb-fdelete-all \
recover-delboth-after-checkpoint \
recover-delboth-checkpoint \
recover-fclose-in-checkpoint \
......@@ -153,6 +154,7 @@ BDB_DONTRUN_TESTS = \
recover-put-multiple-abort \
recover-put-multiple-fdelete-all \
recover-put-multiple-fdelete-some \
recover-put-multiple-srcdb-fdelete-all \
recover-split-checkpoint \
recover-tablelock \
recover-test-logsuppress \
......
#include "test.h"
// verify recovery of a delete multiple log entry
static const int envflags = DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE;
static int
get_key(int i, int dbnum) {
return htonl(i + dbnum);
}
static void
get_data(int *v, int i, int ndbs) {
for (int dbnum = 0; dbnum < ndbs; dbnum++) {
v[dbnum] = get_key(i, dbnum);
}
}
static int
put_callback(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_data, const DBT *src_key, const DBT *src_data) {
dest_db = dest_db; src_db = src_db; dest_key = dest_key; dest_data = dest_data; src_key = src_key; src_data = src_data;
assert(src_db != NULL);
unsigned int dbnum;
assert(dest_db->descriptor->dbt.size == sizeof dbnum);
memcpy(&dbnum, dest_db->descriptor->dbt.data, sizeof dbnum);
assert(dbnum < src_data->size / sizeof (int));
int *pri_data = (int *) src_data->data;
switch (dest_key->flags) {
case 0:
dest_key->size = sizeof (int);
dest_key->data = &pri_data[dbnum];
break;
case DB_DBT_REALLOC:
dest_key->size = sizeof (int);
dest_key->data = toku_realloc(dest_key->data, dest_key->size);
memcpy(dest_key->data, &pri_data[dbnum], dest_key->size);
break;
default:
assert(0);
}
if (dest_data) {
switch (dest_data->flags) {
case 0:
if (dbnum == 0) {
dest_data->size = src_data->size;
dest_data->data = src_data->data;
} else {
dest_data->size = 0;
}
break;
case DB_DBT_REALLOC:
assert(0);
default:
assert(0);
}
}
return 0;
}
static int
del_callback(DB *dest_db, DB *src_db, DBT *dest_key, const DBT *src_key, const DBT *src_data) {
return put_callback(dest_db, src_db, dest_key, NULL, src_key, src_data);
}
static void
run_test(int ndbs, int nrows) {
int r;
r = system("rm -rf " ENVDIR); assert_zero(r);
r = toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
DB_ENV *env;
r = db_env_create(&env, 0); assert_zero(r);
r = env->set_generate_row_callback_for_put(env, put_callback); assert_zero(r);
r = env->set_generate_row_callback_for_del(env, del_callback); assert_zero(r);
r = env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
DB *db[ndbs];
for (int dbnum = 0; dbnum < ndbs; dbnum++) {
r = db_create(&db[dbnum], env, 0);
assert_zero(r);
DBT dbt_dbnum; dbt_init(&dbt_dbnum, &dbnum, sizeof dbnum);
r = db[dbnum]->set_descriptor(db[dbnum], 1, &dbt_dbnum);
assert_zero(r);
char dbname[32]; sprintf(dbname, "%d.tdb", dbnum);
r = db[dbnum]->open(db[dbnum], NULL, dbname, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666);
assert_zero(r);
}
DB_TXN *txn;
r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
for (int i = 0; i < nrows; i++) {
int k = get_key(i, 0);
int v[ndbs]; get_data(v, i, ndbs);
DBT pri_key; dbt_init(&pri_key, &k, sizeof k);
DBT pri_val; dbt_init(&pri_val, &v[0], sizeof v);
DBT keys[ndbs]; memset(keys, 0, sizeof keys);
DBT vals[ndbs]; memset(vals, 0, sizeof vals);
uint32_t flags[ndbs]; memset(flags, 0, sizeof flags);
r = env->put_multiple(env, ndbs > 0 ? db[0] : NULL, txn, &pri_key, &pri_val, ndbs, db, keys, vals, flags);
assert_zero(r);
}
r = txn->commit(txn, 0); assert_zero(r);
r = env->txn_checkpoint(env, 0, 0, 0); assert_zero(r);
r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
for (int i = 0; i < nrows; i++) {
int k = get_key(i, 0);
DBT pri_key; dbt_init(&pri_key, &k, sizeof k);
int v[ndbs]; get_data(v, i, ndbs);
DBT pri_data; dbt_init(&pri_data, &v[0], sizeof v);
DBT keys[ndbs]; memset(keys, 0, sizeof keys);
uint32_t flags[ndbs]; memset(flags, 0, sizeof flags);
r = env->del_multiple(env, ndbs > 0 ? db[0] : NULL, txn, &pri_key, &pri_data, ndbs, db, keys, flags);
assert_zero(r);
}
for (int dbnum = 0; dbnum < ndbs; dbnum++) {
r = db[dbnum]->close(db[dbnum], 0); assert_zero(r);
char dbname[32]; sprintf(dbname, "%d.tdb", dbnum);
r = env->dbremove(env, txn, dbname, NULL, 0); assert_zero(r);
}
r = txn->commit(txn, 0); assert_zero(r);
toku_hard_crash_on_purpose();
}
static void
verify_empty(DB_ENV *env, DB *db) {
int r;
DB_TXN *txn = NULL;
r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
DBC *cursor = NULL;
r = db->cursor(db, txn, &cursor, 0); assert_zero(r);
DBT key; memset(&key, 0, sizeof key);
DBT val; memset(&val, 0, sizeof val);
r = cursor->c_get(cursor, &key, &val, DB_NEXT);
assert(r == DB_NOTFOUND);
r = cursor->c_close(cursor); assert_zero(r);
r = txn->commit(txn, 0); assert_zero(r);
}
static void
verify_all(DB_ENV *env, int ndbs) {
int r;
for (int dbnum = 0; dbnum < ndbs; dbnum++) {
DB *db = NULL;
r = db_create(&db, env, 0);
assert_zero(r);
char dbname[32]; sprintf(dbname, "%d.tdb", dbnum);
r = db->open(db, NULL, dbname, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666);
assert_zero(r);
verify_empty(env, db);
r = db->close(db, 0);
assert_zero(r);
}
}
static void
run_recover(int ndbs, int UU(nrows)) {
int r;
DB_ENV *env;
r = db_env_create(&env, 0); assert_zero(r);
r = env->set_generate_row_callback_for_put(env, put_callback); assert_zero(r);
r = env->set_generate_row_callback_for_del(env, del_callback); assert_zero(r);
r = env->open(env, ENVDIR, envflags|DB_RECOVER, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
verify_all(env, ndbs);
r = env->close(env, 0); assert_zero(r);
}
static int
usage(void) {
return 1;
}
int
test_main (int argc, char * const argv[]) {
BOOL do_test = FALSE;
BOOL do_recover = FALSE;
int ndbs = 2;
int nrows = 1;
for (int i = 1; i < argc; i++) {
char * const arg = argv[i];
if (strcmp(arg, "-v") == 0) {
verbose++;
continue;
}
if (strcmp(arg, "-q") == 0) {
verbose--;
if (verbose < 0)
verbose = 0;
continue;
}
if (strcmp(arg, "--test") == 0) {
do_test = TRUE;
continue;
}
if (strcmp(arg, "--recover") == 0) {
do_recover = TRUE;
continue;
}
if (strcmp(arg, "--ndbs") == 0 && i+1 < argc) {
ndbs = atoi(argv[++i]);
continue;
}
if (strcmp(arg, "--nrows") == 0 && i+1 < argc) {
nrows = atoi(argv[++i]);
continue;
}
if (strcmp(arg, "--help") == 0) {
return usage();
}
}
if (do_test)
run_test(ndbs, nrows);
if (do_recover)
run_recover(ndbs, nrows);
return 0;
}
// this test makes sure the LSN filtering is used during recovery of put_multiple
#include <sys/stat.h>
#include <fcntl.h>
#include "test.h"
const int envflags = DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE;
char *namea="a.db";
char *nameb="b.db";
enum {num_dbs = 2};
static DBT dest_keys[num_dbs];
static DBT dest_vals[num_dbs];
BOOL do_test=FALSE, do_recover=FALSE;
static int
put_multiple_generate(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val) {
assert(src_db != NULL);
assert(dest_db->descriptor->dbt.size == 4);
uint32_t which = *(uint32_t*)dest_db->descriptor->dbt.data;
assert(which < num_dbs);
if (dest_key->data) toku_free(dest_key->data);
if (dest_val->data) toku_free(dest_val->data);
dest_key->data = toku_xmemdup (src_key->data, src_key->size);
dest_key->size = src_key->size;
dest_val->data = toku_xmemdup (src_val->data, src_val->size);
dest_val->size = src_val->size;
return 0;
}
static void run_test (void) {
int r;
r = system("rm -rf " ENVDIR);
CKERR(r);
toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO);
DB_ENV *env;
r = db_env_create(&env, 0); CKERR(r);
r = env->set_generate_row_callback_for_put(env, put_multiple_generate);
CKERR(r);
r = env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
// create a txn that never closes, forcing recovery to run from the beginning of the log
{
DB_TXN *oldest_living_txn;
r = env->txn_begin(env, NULL, &oldest_living_txn, 0); CKERR(r);
}
DBT descriptor;
uint32_t which;
for (which = 0; which < num_dbs; which++) {
dbt_init_realloc(&dest_keys[which]);
dbt_init_realloc(&dest_vals[which]);
}
dbt_init(&descriptor, &which, sizeof(which));
DB *dba;
DB *dbb;
r = db_create(&dba, env, 0); CKERR(r);
r = db_create(&dbb, env, 0); CKERR(r);
which = 0;
r = dba->set_descriptor(dba, 1, &descriptor); CKERR(r);
which = 1;
r = dbb->set_descriptor(dbb, 1, &descriptor); CKERR(r);
r = dba->open(dba, NULL, namea, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r);
r = dbb->open(dbb, NULL, nameb, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r);
r = env->txn_checkpoint(env, 0, 0, 0); CKERR(r);
DB *dbs[num_dbs] = {dba, dbb};
uint32_t flags[num_dbs] = {DB_YESOVERWRITE, DB_YESOVERWRITE};
// txn_begin; insert <a,a>; txn_abort
{
DB_TXN *txn;
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
DBT k={.data="a", .size=2};
DBT v={.data="a", .size=2};
r = env->put_multiple(env, num_dbs > 0 ? dbs[0] : NULL, txn, &k, &v, num_dbs, dbs, dest_keys, dest_vals, flags);
CKERR(r);
r = txn->abort(txn); CKERR(r);
}
r = dbb->close(dbb, 0); CKERR(r);
r = db_create(&dbb, env, 0); CKERR(r);
r = dbb->open(dbb, NULL, nameb, NULL, DB_BTREE, DB_AUTO_COMMIT, 0666); CKERR(r);
dbs[1] = dbb;
// txn_begin; insert <a,b>;
{
DB_TXN *txn;
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
DBT k={.data="a", .size=2};
DBT v={.data="b", .size=2};
r = env->put_multiple(env, num_dbs > 0 ? dbs[0] : NULL, txn, &k, &v, num_dbs, dbs, dest_keys, dest_vals, flags);
CKERR(r);
r = txn->commit(txn, 0); CKERR(r);
}
{
DB_TXN *txn;
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
r = dba->close(dba, 0); CKERR(r);
r = env->dbremove(env, txn, namea, NULL, 0); CKERR(r);
r = dbb->close(dbb, 0); CKERR(r);
r = env->dbremove(env, txn, nameb, NULL, 0); CKERR(r);
r = txn->commit(txn, 0); CKERR(r);
}
r = env->log_flush(env, NULL); CKERR(r);
// abort the process
toku_hard_crash_on_purpose();
}
static void run_recover (void) {
DB_ENV *env;
int r;
// Recovery starts from oldest_living_txn, which is older than any inserts done in run_test,
// so recovery always runs over the entire log.
// run recovery
r = db_env_create(&env, 0); CKERR(r);
r = env->set_generate_row_callback_for_put(env, put_multiple_generate);
CKERR(r);
r = env->open(env, ENVDIR, envflags + DB_RECOVER, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
// verify the data
{
DB *db;
r = db_create(&db, env, 0); CKERR(r);
r = db->open(db, NULL, namea, NULL, DB_UNKNOWN, DB_AUTO_COMMIT, 0666); CKERR2(r, ENOENT);
r = db->close(db, 0); CKERR(r);
}
{
DB *db;
r = db_create(&db, env, 0); CKERR(r);
r = db->open(db, NULL, nameb, NULL, DB_UNKNOWN, DB_AUTO_COMMIT, 0666); CKERR2(r, ENOENT);
r = db->close(db, 0); CKERR(r);
}
r = env->close(env, 0); CKERR(r);
exit(0);
}
const char *cmd;
static void test_parse_args (int argc, char * const argv[]) {
int resultcode;
cmd = argv[0];
argc--; argv++;
while (argc>0) {
if (strcmp(argv[0], "-v") == 0) {
verbose++;
} else if (strcmp(argv[0],"-q")==0) {
verbose--;
if (verbose<0) verbose=0;
} else if (strcmp(argv[0], "--test")==0) {
do_test=TRUE;
} else if (strcmp(argv[0], "--recover") == 0) {
do_recover=TRUE;
} else if (strcmp(argv[0], "-h")==0) {
resultcode=0;
do_usage:
fprintf(stderr, "Usage:\n%s [-v|-q]* [-h] {--test | --recover } \n", cmd);
exit(resultcode);
} else {
fprintf(stderr, "Unknown arg: %s\n", argv[0]);
resultcode=1;
goto do_usage;
}
argc--;
argv++;
}
}
int test_main (int argc, char * const argv[]) {
test_parse_args(argc, argv);
if (do_test) {
run_test();
} else if (do_recover) {
run_recover();
}
return 0;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment