Commit 50d65e92 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:4518], merge fix to main

git-svn-id: file:///svn/toku/tokudb@41810 c7de825b-a66e-492c-adef-691d508d4ae1
parent 88eda5c7
......@@ -168,6 +168,25 @@ static void free_loader(DB_LOADER *loader)
static const char *loader_temp_prefix = "tokuld"; // #2536
static const char *loader_temp_suffix = "XXXXXX";
static int brt_loader_close_and_redirect(DB_LOADER *loader) {
int r;
// use the bulk loader
// in case you've been looking - here is where the real work is done!
r = toku_brt_loader_close(loader->i->brt_loader,
loader->i->error_callback, loader->i->error_extra,
loader->i->poll_func, loader->i->poll_extra);
if ( r==0 ) {
for (int i=0; i<loader->i->N; i++) {
toku_ydb_lock(); //Must hold ydb lock for dictionary_redirect.
r = toku_dictionary_redirect(loader->i->inames_in_env[i],
loader->i->dbs[i]->i->brt,
db_txn_struct_i(loader->i->txn)->tokutxn);
toku_ydb_unlock();
if ( r!=0 ) break;
}
}
return r;
}
// loader_flags currently has three possible values:
// 0 use brt loader
......@@ -190,7 +209,7 @@ int toku_loader_create_loader(DB_ENV *env,
*blp = NULL; // set later when created
DB_LOADER *loader;
DB_LOADER *loader = NULL;
XCALLOC(loader); // init to all zeroes (thus initializing the error_callback and poll_func)
XCALLOC(loader->i); // init to all zeroes (thus initializing all pointers to NULL)
......@@ -207,7 +226,7 @@ int toku_loader_create_loader(DB_ENV *env,
int n = snprintf(loader->i->temp_file_template, MAX_FILE_SIZE, "%s/%s%s", env->i->real_tmp_dir, loader_temp_prefix, loader_temp_suffix);
if ( !(n>0 && n<MAX_FILE_SIZE) ) {
rval = -1;
goto create_exit;
goto create_exit;
}
memset(&loader->i->err_key, 0, sizeof(loader->i->err_key));
......@@ -225,8 +244,7 @@ int toku_loader_create_loader(DB_ENV *env,
// lock tables and check empty
for(int i=0;i<N;i++) {
if (!(loader_flags&DB_PRELOCKED_WRITE)) {
BOOL using_puts = (loader->i->loader_flags & LOADER_USE_PUTS) != 0;
r = toku_db_pre_acquire_table_lock(dbs[i], txn, !using_puts);
r = toku_db_pre_acquire_table_lock(dbs[i], txn);
if (r!=0) break;
}
r = !toku_brt_is_empty_fast(dbs[i]->i->brt);
......@@ -234,76 +252,84 @@ int toku_loader_create_loader(DB_ENV *env,
}
if ( r!=0 ) {
rval = -1;
goto create_exit;
goto create_exit;
}
{
brt_compare_func compare_functions[N];
for (int i=0; i<N; i++) {
compare_functions[i] = env->i->bt_compare;
}
// time to open the big kahuna
if ( FALSE && (loader->i->loader_flags & LOADER_USE_PUTS) ) {
XCALLOC_N(loader->i->N, loader->i->ekeys);
XCALLOC_N(loader->i->N, loader->i->evals);
for (int i=0; i<N; i++) {
loader->i->ekeys[i].flags = DB_DBT_REALLOC;
loader->i->evals[i].flags = DB_DBT_REALLOC;
}
loader->i->brt_loader = NULL;
rval = 0;
}
else {
char **XMALLOC_N(N, new_inames_in_env);
BRT *XMALLOC_N(N, brts);
for (int i=0; i<N; i++) {
brts[i] = dbs[i]->i->brt;
}
loader->i->ekeys = NULL;
loader->i->evals = NULL;
LSN load_lsn;
r = ydb_load_inames(env, txn, N, dbs, new_inames_in_env, &load_lsn, use_brt_loader);
if ( r!=0 ) {
toku_free(new_inames_in_env);
toku_free(brts);
rval = r;
goto create_exit;
}
TOKUTXN ttxn = txn ? db_txn_struct_i(txn)->tokutxn : NULL;
r = toku_brt_loader_open(&loader->i->brt_loader,
loader->i->env->i->cachetable,
loader->i->env->i->generate_row_for_put,
src_db,
N,
brts,
(const char **)new_inames_in_env,
compare_functions,
loader->i->temp_file_template,
load_lsn,
ttxn);
if ( r!=0 ) {
toku_free(new_inames_in_env);
toku_free(brts);
rval = r;
goto create_exit;
brt_compare_func compare_functions[N];
for (int i=0; i<N; i++) {
compare_functions[i] = env->i->bt_compare;
}
// time to open the big kahuna
char **XMALLOC_N(N, new_inames_in_env);
BRT *XMALLOC_N(N, brts);
for (int i=0; i<N; i++) {
brts[i] = dbs[i]->i->brt;
}
loader->i->ekeys = NULL;
loader->i->evals = NULL;
LSN load_lsn;
r = ydb_load_inames(env, txn, N, dbs, new_inames_in_env, &load_lsn, use_brt_loader);
if ( r!=0 ) {
toku_free(new_inames_in_env);
toku_free(brts);
rval = r;
goto create_exit;
}
TOKUTXN ttxn = txn ? db_txn_struct_i(txn)->tokutxn : NULL;
r = toku_brt_loader_open(&loader->i->brt_loader,
loader->i->env->i->cachetable,
loader->i->env->i->generate_row_for_put,
src_db,
N,
brts,
(const char **)new_inames_in_env,
compare_functions,
loader->i->temp_file_template,
load_lsn,
ttxn);
if ( r!=0 ) {
toku_free(new_inames_in_env);
toku_free(brts);
rval = r;
goto create_exit;
}
loader->i->inames_in_env = new_inames_in_env;
toku_free(brts);
if (loader->i->loader_flags & LOADER_USE_PUTS) {
XCALLOC_N(loader->i->N, loader->i->ekeys);
XCALLOC_N(loader->i->N, loader->i->evals);
toku_ydb_unlock();
// the following function grabs the ydb lock, so we
// first unlock before calling it
rval = brt_loader_close_and_redirect(loader);
toku_ydb_lock();
assert_zero(rval);
for (int i=0; i<N; i++) {
loader->i->ekeys[i].flags = DB_DBT_REALLOC;
loader->i->evals[i].flags = DB_DBT_REALLOC;
toku_brt_suppress_recovery_logs(dbs[i]->i->brt, db_txn_struct_i(txn)->tokutxn);
}
loader->i->inames_in_env = new_inames_in_env;
toku_free(brts);
rval = 0;
}
loader->i->brt_loader = NULL;
// close the brtloader and skip to the redirection
rval = 0;
}
rval = 0;
}
*blp = loader;
create_exit:
loader_add_refs(loader);
if (rval == 0) {
(void) __sync_fetch_and_add(&STATUS_VALUE(LOADER_CREATE), 1);
(void) __sync_fetch_and_add(&STATUS_VALUE(LOADER_CURRENT), 1);
if (STATUS_VALUE(LOADER_CURRENT) > STATUS_VALUE(LOADER_MAX) )
STATUS_VALUE(LOADER_MAX) = STATUS_VALUE(LOADER_CURRENT); // not worth a lock to make threadsafe, may be inaccurate
(void) __sync_fetch_and_add(&STATUS_VALUE(LOADER_CREATE), 1);
(void) __sync_fetch_and_add(&STATUS_VALUE(LOADER_CURRENT), 1);
if (STATUS_VALUE(LOADER_CURRENT) > STATUS_VALUE(LOADER_MAX) )
STATUS_VALUE(LOADER_MAX) = STATUS_VALUE(LOADER_CURRENT); // not worth a lock to make threadsafe, may be inaccurate
}
else {
(void) __sync_fetch_and_add(&STATUS_VALUE(LOADER_CREATE_FAIL), 1);
(void) __sync_fetch_and_add(&STATUS_VALUE(LOADER_CREATE_FAIL), 1);
free_loader(loader);
}
return rval;
......@@ -343,7 +369,7 @@ int toku_loader_put(DB_LOADER *loader, DBT *key, DBT *val)
goto cleanup;
}
if ( FALSE && (loader->i->loader_flags & LOADER_USE_PUTS) ) {
if (loader->i->loader_flags & LOADER_USE_PUTS) {
r = loader->i->env->put_multiple(loader->i->env,
loader->i->src_db, // src_db
loader->i->txn,
......@@ -394,7 +420,7 @@ int toku_loader_close(DB_LOADER *loader)
if ( loader->i->error_callback != NULL ) {
loader->i->error_callback(loader->i->dbs[loader->i->err_i], loader->i->err_i, loader->i->err_errno, &loader->i->err_key, &loader->i->err_val, loader->i->error_extra);
}
if (TRUE || !(loader->i->loader_flags & LOADER_USE_PUTS ) ) {
if (!(loader->i->loader_flags & LOADER_USE_PUTS ) ) {
r = toku_brt_loader_abort(loader->i->brt_loader, TRUE);
}
else {
......@@ -402,22 +428,8 @@ int toku_loader_close(DB_LOADER *loader)
}
}
else { // no error outstanding
if (TRUE || !(loader->i->loader_flags & LOADER_USE_PUTS ) ) {
// use the bulk loader
// in case you've been looking - here is where the real work is done!
r = toku_brt_loader_close(loader->i->brt_loader,
loader->i->error_callback, loader->i->error_extra,
loader->i->poll_func, loader->i->poll_extra);
if ( r==0 ) {
for (int i=0; i<loader->i->N; i++) {
toku_ydb_lock(); //Must hold ydb lock for dictionary_redirect.
r = toku_dictionary_redirect(loader->i->inames_in_env[i],
loader->i->dbs[i]->i->brt,
db_txn_struct_i(loader->i->txn)->tokutxn);
toku_ydb_unlock();
if ( r!=0 ) break;
}
}
if (!(loader->i->loader_flags & LOADER_USE_PUTS ) ) {
brt_loader_close_and_redirect(loader);
}
}
toku_ydb_lock();
......@@ -441,7 +453,7 @@ int toku_loader_abort(DB_LOADER *loader)
}
}
if (TRUE || !(loader->i->loader_flags & LOADER_USE_PUTS) ) {
if (!(loader->i->loader_flags & LOADER_USE_PUTS) ) {
r = toku_brt_loader_abort(loader->i->brt_loader, TRUE);
}
toku_ydb_lock();
......
......@@ -2776,7 +2776,7 @@ toku_env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbna
// further down the stack.
DB* zombie = env_get_zombie_db_with_dname(env, dname);
if (zombie)
r = toku_db_pre_acquire_table_lock(zombie, child, TRUE);
r = toku_db_pre_acquire_table_lock(zombie, child);
if (r!=0 && r!=DB_LOCK_NOTGRANTED)
toku_ydb_do_error(env, r, "Cannot remove dictionary.\n");
}
......@@ -2883,7 +2883,7 @@ toku_env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbnam
if (r==0) {
zombie = env_get_zombie_db_with_dname(env, dname);
if (zombie)
r = toku_db_pre_acquire_table_lock(zombie, child, TRUE);
r = toku_db_pre_acquire_table_lock(zombie, child);
if (r!=0 && r!=DB_LOCK_NOTGRANTED)
toku_ydb_do_error(env, r, "Cannot rename dictionary.\n");
}
......
......@@ -647,66 +647,11 @@ toku_db_key_range64(DB* db, DB_TXN* txn __attribute__((__unused__)), DBT* key, u
// needed by loader.c
int
toku_db_pre_acquire_table_lock(DB *db, DB_TXN *txn, BOOL UU(just_lock)) {
toku_db_pre_acquire_table_lock(DB *db, DB_TXN *txn) {
HANDLE_PANICKED_DB(db);
if (!db->i->lt || !txn) return 0;
int r;
r = get_range_lock(db, txn, toku_lt_neg_infinity, toku_lt_infinity, LOCK_REQUEST_WRITE);
// commented out code for log suppression and recovery suppression.
// TODO: figure out right thing to do with this code.
#if 0
if (r==0 && !just_lock &&
!toku_brt_is_recovery_logging_suppressed(db->i->brt) &&
toku_brt_is_empty_fast(db->i->brt)
) {
//Try to suppress both rollback and recovery logs
DB_LOADER *loader;
DB *dbs[1] = {db};
uint32_t db_flags[1] = {DB_NOOVERWRITE};
uint32_t dbt_flags[1] = {0};
uint32_t loader_flags = DB_PRELOCKED_WRITE; //Don't recursively prelock
DB_ENV *env = db->dbenv;
DB_TXN *child = NULL;
{
// begin child
int rt = toku_txn_begin(env, txn, &child, DB_TXN_NOSYNC, 1, true);
assert(rt==0);
}
toku_ydb_unlock(); //Cannot hold ydb lock when creating loader
int r_loader = env->create_loader(env, child, &loader, NULL, 1, dbs, db_flags, dbt_flags, loader_flags);
if (r_loader==0) {
r_loader = loader->set_error_callback(loader, NULL, NULL);
assert(r_loader==0);
r_loader = loader->set_poll_function(loader, NULL, NULL);
assert(r_loader==0);
// close the loader
r_loader = loader->close(loader);
if (r_loader==0) {
toku_brt_suppress_recovery_logs(db->i->brt, db_txn_struct_i(child)->tokutxn);
}
}
else if (r_loader != DB_LOCK_NOTGRANTED) {
//Lock not granted is not an error.
//It just means we cannot use the loader optimization.
assert(r==0);
r = r_loader;
}
if (r_loader == 0) { // commit
r = locked_txn_commit(child, 0);
assert(r==0);
STATUS_VALUE(YDB_LAYER_LOGSUPPRESS)++; // accountability
}
else { // abort
r = locked_txn_abort(child);
assert(r==0);
STATUS_VALUE(YDB_LAYER_LOGSUPPRESS_FAIL)++; // accountability
}
toku_ydb_lock(); //Reaquire ydb lock.
}
#endif
return r;
}
......@@ -766,7 +711,7 @@ toku_db_truncate(DB *db, DB_TXN *txn, u_int32_t *row_count, u_int32_t flags) {
if (r != 0) {
return r;
}
r = toku_db_pre_acquire_table_lock(db, txn, TRUE);
r = toku_db_pre_acquire_table_lock(db, txn);
if (r != 0) {
return r;
}
......@@ -1003,7 +948,7 @@ toku_db_verify_with_progress(DB *db, int (*progress_callback)(void *extra, float
static int
db_pre_acquire_table_lock(DB *db, DB_TXN *txn) {
return toku_db_pre_acquire_table_lock(db, txn, TRUE);
return toku_db_pre_acquire_table_lock(db, txn);
}
int toku_close_db_internal (DB * db, bool oplsn_valid, LSN oplsn) {
......
......@@ -37,7 +37,7 @@ toku_db_get_compare_fun(DB* db) {
int toku_db_pre_acquire_fileops_lock(DB *db, DB_TXN *txn);
int db_open_iname(DB * db, DB_TXN * txn, const char *iname, u_int32_t flags, int mode);
int toku_db_pre_acquire_table_lock(DB *db, DB_TXN *txn, BOOL just_lock);
int toku_db_pre_acquire_table_lock(DB *db, DB_TXN *txn);
int toku_db_get (DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags);
int toku_db_create(DB ** db, DB_ENV * env, u_int32_t flags);
int toku_db_close(DB * db, u_int32_t flags, bool oplsn_valid, LSN oplsn);
......
......@@ -281,7 +281,7 @@ toku_db_update_broadcast(DB *db, DB_TXN *txn,
BOOL do_locking = (db->i->lt && !(lock_flags & DB_PRELOCKED_WRITE));
if (do_locking) {
r = toku_db_pre_acquire_table_lock(db, txn, TRUE);
r = toku_db_pre_acquire_table_lock(db, txn);
if (r != 0) { goto cleanup; }
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment