Commit 676b2308 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:1078], check in savepoint implementation to main line

git-svn-id: file:///svn/mysql/tokudb-engine/tokudb-engine@21825 c7de825b-a66e-492c-adef-691d508d4ae1
parent 02638f52
...@@ -5217,7 +5217,7 @@ int ha_tokudb::create_txn(THD* thd, tokudb_trx_data* trx) { ...@@ -5217,7 +5217,7 @@ int ha_tokudb::create_txn(THD* thd, tokudb_trx_data* trx) {
goto cleanup; goto cleanup;
} }
if (tokudb_debug & TOKUDB_DEBUG_TXN) { if (tokudb_debug & TOKUDB_DEBUG_TXN) {
TOKUDB_TRACE("master:%p\n", trx->all); TOKUDB_TRACE("just created master:%p\n", trx->all);
} }
trx->sp_level = trx->all; trx->sp_level = trx->all;
trans_register_ha(thd, TRUE, tokudb_hton); trans_register_ha(thd, TRUE, tokudb_hton);
...@@ -5240,8 +5240,9 @@ int ha_tokudb::create_txn(THD* thd, tokudb_trx_data* trx) { ...@@ -5240,8 +5240,9 @@ int ha_tokudb::create_txn(THD* thd, tokudb_trx_data* trx) {
trx->tokudb_lock_count--; // We didn't get the lock trx->tokudb_lock_count--; // We didn't get the lock
goto cleanup; goto cleanup;
} }
trx->sub_sp_level = trx->stmt;
if (tokudb_debug & TOKUDB_DEBUG_TXN) { if (tokudb_debug & TOKUDB_DEBUG_TXN) {
TOKUDB_TRACE("stmt:%p:%p\n", trx->sp_level, trx->stmt); TOKUDB_TRACE("just created stmt:%p:%p\n", trx->sp_level, trx->stmt);
} }
reset_stmt_progress(&trx->stmt_progress); reset_stmt_progress(&trx->stmt_progress);
trans_register_ha(thd, FALSE, tokudb_hton); trans_register_ha(thd, FALSE, tokudb_hton);
...@@ -5293,7 +5294,8 @@ int ha_tokudb::external_lock(THD * thd, int lock_type) { ...@@ -5293,7 +5294,8 @@ int ha_tokudb::external_lock(THD * thd, int lock_type) {
goto cleanup; goto cleanup;
} }
} }
transaction = trx->stmt; assert(thd->in_sub_stmt == 0);
transaction = trx->sub_sp_level;
} }
else { else {
lock.type = TL_UNLOCK; // Unlocked lock.type = TL_UNLOCK; // Unlocked
...@@ -5323,6 +5325,7 @@ int ha_tokudb::external_lock(THD * thd, int lock_type) { ...@@ -5323,6 +5325,7 @@ int ha_tokudb::external_lock(THD * thd, int lock_type) {
if (tokudb_debug & TOKUDB_DEBUG_TXN) if (tokudb_debug & TOKUDB_DEBUG_TXN)
TOKUDB_TRACE("commit:%p:%d\n", trx->stmt, error); TOKUDB_TRACE("commit:%p:%d\n", trx->stmt, error);
trx->stmt = NULL; trx->stmt = NULL;
trx->sub_sp_level = NULL;
} }
} }
transaction = NULL; transaction = NULL;
...@@ -5341,7 +5344,7 @@ int ha_tokudb::external_lock(THD * thd, int lock_type) { ...@@ -5341,7 +5344,7 @@ int ha_tokudb::external_lock(THD * thd, int lock_type) {
*/ */
int ha_tokudb::start_stmt(THD * thd, thr_lock_type lock_type) { int ha_tokudb::start_stmt(THD * thd, thr_lock_type lock_type) {
TOKUDB_DBUG_ENTER("ha_tokudb::start_stmt"); TOKUDB_DBUG_ENTER("ha_tokudb::start_stmt cmd=%d %d", thd_sql_command(thd), lock_type);
int error = 0; int error = 0;
...@@ -5353,12 +5356,16 @@ int ha_tokudb::start_stmt(THD * thd, thr_lock_type lock_type) { ...@@ -5353,12 +5356,16 @@ int ha_tokudb::start_stmt(THD * thd, thr_lock_type lock_type) {
and there could be many bdb tables referenced in the query and there could be many bdb tables referenced in the query
*/ */
if (!trx->stmt) { if (!trx->stmt) {
DBUG_PRINT("trans", ("starting transaction stmt"));
error = create_txn(thd, trx); error = create_txn(thd, trx);
if (error) { if (error) {
goto cleanup; goto cleanup;
} }
} }
else {
if (tokudb_debug & TOKUDB_DEBUG_TXN) {
TOKUDB_TRACE("trx->stmt already existed\n");
}
}
// //
// we know we are in lock tables // we know we are in lock tables
// attempt to grab a table lock // attempt to grab a table lock
...@@ -5370,15 +5377,16 @@ int ha_tokudb::start_stmt(THD * thd, thr_lock_type lock_type) { ...@@ -5370,15 +5377,16 @@ int ha_tokudb::start_stmt(THD * thd, thr_lock_type lock_type) {
// potentially grab some locks but not all is ok. // potentially grab some locks but not all is ok.
// //
if (lock.type <= TL_READ_NO_INSERT) { if (lock.type <= TL_READ_NO_INSERT) {
acquire_table_lock(trx->stmt,lock_read); acquire_table_lock(trx->sub_sp_level,lock_read);
} }
else { else {
acquire_table_lock(trx->stmt,lock_write); acquire_table_lock(trx->sub_sp_level,lock_write);
} }
if (added_rows > deleted_rows) { if (added_rows > deleted_rows) {
share->rows_from_locked_table = added_rows - deleted_rows; share->rows_from_locked_table = added_rows - deleted_rows;
} }
transaction = trx->stmt; transaction = trx->sub_sp_level;
trans_register_ha(thd, FALSE, tokudb_hton);
cleanup: cleanup:
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
...@@ -6731,7 +6739,7 @@ int ha_tokudb::optimize(THD * thd, HA_CHECK_OPT * check_opt) { ...@@ -6731,7 +6739,7 @@ int ha_tokudb::optimize(THD * thd, HA_CHECK_OPT * check_opt) {
// in order to get a valid transaction // in order to get a valid transaction
// this is a bit hacky, but it is the best we have right now // this is a bit hacky, but it is the best we have right now
// //
txn = trx->stmt ? trx->stmt : trx->sp_level; txn = trx->sub_sp_level ? trx->sub_sp_level : trx->sp_level;
if (txn == NULL) { if (txn == NULL) {
error = db_env->txn_begin(db_env, NULL, &txn, 0); error = db_env->txn_begin(db_env, NULL, &txn, 0);
if (error) { if (error) {
......
...@@ -98,6 +98,7 @@ typedef struct st_tokudb_trx_data { ...@@ -98,6 +98,7 @@ typedef struct st_tokudb_trx_data {
DB_TXN *all; DB_TXN *all;
DB_TXN *stmt; DB_TXN *stmt;
DB_TXN *sp_level; DB_TXN *sp_level;
DB_TXN *sub_sp_level;
uint tokudb_lock_count; uint tokudb_lock_count;
tokudb_stmt_progress stmt_progress; tokudb_stmt_progress stmt_progress;
bool checkpoint_lock_taken; bool checkpoint_lock_taken;
......
...@@ -34,6 +34,13 @@ extern "C" { ...@@ -34,6 +34,13 @@ extern "C" {
#define TOKU_METADB_NAME "tokudb_meta" #define TOKU_METADB_NAME "tokudb_meta"
typedef struct savepoint_info {
DB_TXN* txn;
tokudb_trx_data* trx;
bool in_sub_stmt;
} *SP_INFO, SP_INFO_T;
static inline void *thd_data_get(THD *thd, int slot) { static inline void *thd_data_get(THD *thd, int slot) {
return thd->ha_data[slot].ha_ptr; return thd->ha_data[slot].ha_ptr;
} }
...@@ -109,11 +116,9 @@ static int tokudb_close_connection(handlerton * hton, THD * thd); ...@@ -109,11 +116,9 @@ static int tokudb_close_connection(handlerton * hton, THD * thd);
static int tokudb_commit(handlerton * hton, THD * thd, bool all); static int tokudb_commit(handlerton * hton, THD * thd, bool all);
static int tokudb_rollback(handlerton * hton, THD * thd, bool all); static int tokudb_rollback(handlerton * hton, THD * thd, bool all);
static uint tokudb_alter_table_flags(uint flags); static uint tokudb_alter_table_flags(uint flags);
#if 0
static int tokudb_rollback_to_savepoint(handlerton * hton, THD * thd, void *savepoint); static int tokudb_rollback_to_savepoint(handlerton * hton, THD * thd, void *savepoint);
static int tokudb_savepoint(handlerton * hton, THD * thd, void *savepoint); static int tokudb_savepoint(handlerton * hton, THD * thd, void *savepoint);
static int tokudb_release_savepoint(handlerton * hton, THD * thd, void *savepoint); static int tokudb_release_savepoint(handlerton * hton, THD * thd, void *savepoint);
#endif
handlerton *tokudb_hton; handlerton *tokudb_hton;
const char *ha_tokudb_ext = ".tokudb"; const char *ha_tokudb_ext = ".tokudb";
...@@ -195,12 +200,12 @@ static int tokudb_init_func(void *p) { ...@@ -195,12 +200,12 @@ static int tokudb_init_func(void *p) {
tokudb_hton->create = tokudb_create_handler; tokudb_hton->create = tokudb_create_handler;
tokudb_hton->close_connection = tokudb_close_connection; tokudb_hton->close_connection = tokudb_close_connection;
#if 0
tokudb_hton->savepoint_offset = sizeof(DB_TXN *); tokudb_hton->savepoint_offset = sizeof(SP_INFO_T);
tokudb_hton->savepoint_set = tokudb_savepoint; tokudb_hton->savepoint_set = tokudb_savepoint;
tokudb_hton->savepoint_rollback = tokudb_rollback_to_savepoint; tokudb_hton->savepoint_rollback = tokudb_rollback_to_savepoint;
tokudb_hton->savepoint_release = tokudb_release_savepoint; tokudb_hton->savepoint_release = tokudb_release_savepoint;
#endif
tokudb_hton->commit = tokudb_commit; tokudb_hton->commit = tokudb_commit;
tokudb_hton->rollback = tokudb_rollback; tokudb_hton->rollback = tokudb_rollback;
tokudb_hton->panic = tokudb_end; tokudb_hton->panic = tokudb_end;
...@@ -590,7 +595,7 @@ static int tokudb_commit(handlerton * hton, THD * thd, bool all) { ...@@ -590,7 +595,7 @@ static int tokudb_commit(handlerton * hton, THD * thd, bool all) {
DB_TXN **txn = all ? &trx->all : &trx->stmt; DB_TXN **txn = all ? &trx->all : &trx->stmt;
if (*txn) { if (*txn) {
if (tokudb_debug & TOKUDB_DEBUG_TXN) { if (tokudb_debug & TOKUDB_DEBUG_TXN) {
TOKUDB_TRACE("commit:%d:%p\n", all, *txn); TOKUDB_TRACE("doing txn commit:%d:%p\n", all, *txn);
} }
commit_txn_with_progress(*txn, syncflag, thd); commit_txn_with_progress(*txn, syncflag, thd);
if (*txn == trx->sp_level) { if (*txn == trx->sp_level) {
...@@ -599,7 +604,7 @@ static int tokudb_commit(handlerton * hton, THD * thd, bool all) { ...@@ -599,7 +604,7 @@ static int tokudb_commit(handlerton * hton, THD * thd, bool all) {
*txn = 0; *txn = 0;
} }
else if (tokudb_debug & TOKUDB_DEBUG_TXN) { else if (tokudb_debug & TOKUDB_DEBUG_TXN) {
TOKUDB_TRACE("commit0\n"); TOKUDB_TRACE("nothing to commit %d\n", all);
} }
reset_stmt_progress(&trx->stmt_progress); reset_stmt_progress(&trx->stmt_progress);
TOKUDB_DBUG_RETURN(0); TOKUDB_DBUG_RETURN(0);
...@@ -629,27 +634,51 @@ static int tokudb_rollback(handlerton * hton, THD * thd, bool all) { ...@@ -629,27 +634,51 @@ static int tokudb_rollback(handlerton * hton, THD * thd, bool all) {
TOKUDB_DBUG_RETURN(0); TOKUDB_DBUG_RETURN(0);
} }
#if 0
static int tokudb_savepoint(handlerton * hton, THD * thd, void *savepoint) { static int tokudb_savepoint(handlerton * hton, THD * thd, void *savepoint) {
TOKUDB_DBUG_ENTER("tokudb_savepoint"); TOKUDB_DBUG_ENTER("tokudb_savepoint");
int error; int error;
DB_TXN **save_txn = (DB_TXN **) savepoint; SP_INFO save_info = (SP_INFO)savepoint;
tokudb_trx_data *trx = (tokudb_trx_data *) thd_data_get(thd, hton->slot); tokudb_trx_data *trx = (tokudb_trx_data *) thd_data_get(thd, hton->slot);
if (!(error = db_env->txn_begin(db_env, trx->sp_level, save_txn, 0))) { if (thd->in_sub_stmt) {
trx->sp_level = *save_txn; assert(trx->stmt);
error = db_env->txn_begin(db_env, trx->sub_sp_level, &(save_info->txn), 0);
if (error) {
goto cleanup;
}
trx->sub_sp_level = save_info->txn;
save_info->in_sub_stmt = true;
} }
else {
error = db_env->txn_begin(db_env, trx->sp_level, &(save_info->txn), 0);
if (error) {
goto cleanup;
}
trx->sp_level = save_info->txn;
save_info->in_sub_stmt = false;
}
save_info->trx = trx;
error = 0;
cleanup:
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
static int tokudb_rollback_to_savepoint(handlerton * hton, THD * thd, void *savepoint) { static int tokudb_rollback_to_savepoint(handlerton * hton, THD * thd, void *savepoint) {
TOKUDB_DBUG_ENTER("tokudb_rollback_to_savepoint"); TOKUDB_DBUG_ENTER("tokudb_rollback_to_savepoint");
int error; int error;
DB_TXN *parent, **save_txn = (DB_TXN **) savepoint; SP_INFO save_info = (SP_INFO)savepoint;
DB_TXN* parent = NULL;
DB_TXN* txn_to_rollback = save_info->txn;
tokudb_trx_data *trx = (tokudb_trx_data *) thd_data_get(thd, hton->slot); tokudb_trx_data *trx = (tokudb_trx_data *) thd_data_get(thd, hton->slot);
parent = (*save_txn)->parent; parent = txn_to_rollback->parent;
if (!(error = (*save_txn)->abort(*save_txn))) { if (!(error = txn_to_rollback->abort(txn_to_rollback))) {
if (save_info->in_sub_stmt) {
trx->sub_sp_level = parent;
}
else {
trx->sp_level = parent; trx->sp_level = parent;
}
error = tokudb_savepoint(hton, thd, savepoint); error = tokudb_savepoint(hton, thd, savepoint);
} }
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
...@@ -658,17 +687,25 @@ static int tokudb_rollback_to_savepoint(handlerton * hton, THD * thd, void *save ...@@ -658,17 +687,25 @@ static int tokudb_rollback_to_savepoint(handlerton * hton, THD * thd, void *save
static int tokudb_release_savepoint(handlerton * hton, THD * thd, void *savepoint) { static int tokudb_release_savepoint(handlerton * hton, THD * thd, void *savepoint) {
TOKUDB_DBUG_ENTER("tokudb_release_savepoint"); TOKUDB_DBUG_ENTER("tokudb_release_savepoint");
int error; int error;
DB_TXN *parent, **save_txn = (DB_TXN **) savepoint;
SP_INFO save_info = (SP_INFO)savepoint;
DB_TXN* parent = NULL;
DB_TXN* txn_to_commit = save_info->txn;
tokudb_trx_data *trx = (tokudb_trx_data *) thd_data_get(thd, hton->slot); tokudb_trx_data *trx = (tokudb_trx_data *) thd_data_get(thd, hton->slot);
parent = (*save_txn)->parent; parent = txn_to_commit->parent;
if (!(error = (*save_txn)->commit(*save_txn, 0))) { if (!(error = txn_to_commit->commit(txn_to_commit, 0))) {
if (save_info->in_sub_stmt) {
trx->sub_sp_level = parent;
}
else {
trx->sp_level = parent; trx->sp_level = parent;
*save_txn = 0; }
save_info->txn = NULL;
} }
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
#endif
static int smart_dbt_do_nothing (DBT const *key, DBT const *row, void *context) { static int smart_dbt_do_nothing (DBT const *key, DBT const *row, void *context) {
return 0; return 0;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment