Commit e24c4917 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:1979], merge to main line

git-svn-id: file:///svn/mysql/tokudb-engine/src@15772 c7de825b-a66e-492c-adef-691d508d4ae1
parent d82692c7
......@@ -500,7 +500,6 @@ ulonglong retrieve_auto_increment(uint16 type, uint32 offset,const uchar *record
}
inline uint get_null_offset(TABLE* table, Field* field) {
return (uint) ((uchar*) field->null_ptr - (uchar*) table->record[0]);
}
......@@ -813,18 +812,12 @@ const uchar* unpack_toku_field_blob(
static int add_table_to_metadata(const char *name, TABLE* table) {
static int add_table_to_metadata(const char *name, TABLE* table, DB_TXN* txn) {
int error = 0;
DBT key;
DBT val;
DB_TXN* txn = NULL;
uchar hidden_primary_key = (table->s->primary_key >= MAX_KEY);
error = db_env->txn_begin(db_env, 0, &txn, 0);
if (error) {
goto cleanup;
}
assert(txn);
bzero((void *)&key, sizeof(key));
bzero((void *)&val, sizeof(val));
......@@ -839,25 +832,14 @@ static int add_table_to_metadata(const char *name, TABLE* table) {
&val,
DB_YESOVERWRITE
);
cleanup:
if (txn) {
int r = !error ? txn->commit(txn,0) : txn->abort(txn);
assert(!r);
}
return error;
}
static int drop_table_from_metadata(const char *name) {
static int drop_table_from_metadata(const char *name, DB_TXN* txn) {
int error = 0;
DBT key;
DBT data;
DB_TXN* txn = NULL;
error = db_env->txn_begin(db_env, 0, &txn, 0);
if (error) {
goto cleanup;
}
assert(txn);
bzero((void *)&key, sizeof(key));
bzero((void *)&data, sizeof(data));
key.data = (void *)name;
......@@ -868,25 +850,15 @@ static int drop_table_from_metadata(const char *name) {
&key ,
DB_DELETE_ANY
);
cleanup:
if (txn) {
int r = !error ? txn->commit(txn,0) : txn->abort(txn);
assert(!r);
}
return error;
}
static int rename_table_in_metadata(const char *from, const char *to) {
static int rename_table_in_metadata(const char *from, const char *to, DB_TXN* txn) {
int error = 0;
DBT from_key;
DBT to_key;
DBT val;
DB_TXN* txn = NULL;
error = db_env->txn_begin(db_env, 0, &txn, 0);
if (error) {
goto cleanup;
}
assert(txn);
bzero((void *)&from_key, sizeof(from_key));
bzero((void *)&to_key, sizeof(to_key));
......@@ -933,10 +905,6 @@ static int rename_table_in_metadata(const char *from, const char *to) {
error = 0;
cleanup:
if (txn) {
int r = !error ? txn->commit(txn,0) : txn->abort(txn);
assert(!r);
}
my_free(val.data, MYF(MY_ALLOW_ZERO_PTR));
return error;
......@@ -976,7 +944,7 @@ static int check_table_in_metadata(const char *name, bool* table_found) {
cleanup:
if (txn) {
error = txn->commit(txn,0);
commit_txn(txn, 0);
}
pthread_mutex_unlock(&tokudb_meta_mutex);
return error;
......@@ -1051,18 +1019,103 @@ bool ha_tokudb::has_auto_increment_flag(uint* index) {
return ai_found;
}
int ha_tokudb::open_status_dictionary(DB** ptr, const char* name, DB_TXN* txn) {
int error;
char* newname = NULL;
uint open_mode = DB_THREAD;
newname = (char *)my_malloc(
get_max_dict_name_path_length(name),
MYF(MY_WME)
);
if (newname == NULL) {
error = ENOMEM;
goto cleanup;
}
make_name(newname, name, "status");
if (tokudb_debug & TOKUDB_DEBUG_OPEN) {
TOKUDB_TRACE("open:%s\n", newname);
}
error = db_create(ptr, db_env, 0);
if (error) { goto cleanup; }
(*ptr)->set_bt_compare((*ptr), tokudb_cmp_dbt_key);
error = (*ptr)->open((*ptr), txn, newname, NULL, DB_BTREE, open_mode, 0);
if (error) {
goto cleanup;
}
cleanup:
if (error) {
if (*ptr) {
(*ptr)->close(*ptr, 0);
*ptr = NULL;
}
}
my_free(newname, MYF(MY_ALLOW_ZERO_PTR));
return error;
}
int ha_tokudb::open_main_dictionary(const char* name, int mode, DB_TXN* txn) {
int error;
char* newname = NULL;
uint open_flags = (mode == O_RDONLY ? DB_RDONLY : 0) | DB_THREAD;
open_flags += DB_AUTO_COMMIT;
assert(share->file == NULL);
assert(share->key_file[primary_key] == NULL);
newname = (char *)my_malloc(
get_max_dict_name_path_length(name),
MYF(MY_WME|MY_ZEROFILL)
);
if (newname == NULL) {
error = ENOMEM;
goto exit;
}
make_name(newname, name, "main");
error = db_create(&share->file, db_env, 0);
if (error) {
goto exit;
}
share->key_file[primary_key] = share->file;
share->key_type[primary_key] = hidden_primary_key ? DB_YESOVERWRITE : DB_NOOVERWRITE;
//
// set comparison function for main.tokudb
//
share->file->set_bt_compare(share->file, tokudb_cmp_dbt_key);
error = share->file->open(share->file, txn, newname, NULL, DB_BTREE, open_flags, 0);
if (error) {
goto exit;
}
if (tokudb_debug & TOKUDB_DEBUG_OPEN) {
TOKUDB_TRACE("open:%s:file=%p\n", newname, share->file);
}
exit:
if (error) {
if (share->file) {
share->file->close(
share->file,
0
);
share->file = NULL;
share->key_file[primary_key] = NULL;
}
}
my_free(newname, MYF(MY_ALLOW_ZERO_PTR));
return error;
}
//
// Open a secondary table, the key will be a secondary index, the data will be a primary key
//
int ha_tokudb::open_secondary_table(DB** ptr, KEY* key_info, const char* name, int mode, u_int32_t* key_type) {
int ha_tokudb::open_secondary_dictionary(DB** ptr, KEY* key_info, const char* name, int mode, u_int32_t* key_type, DB_TXN* txn) {
int error = ENOSYS;
char dict_name[MAX_DICT_NAME_LEN];
char name_buff[FN_REFLEN];
uint open_flags = (mode == O_RDONLY ? DB_RDONLY : 0) | DB_THREAD;
char* newname = NULL;
char* fn_ret = NULL;
uint newname_len = 0;
sprintf(dict_name, "key-%s", key_info->name);
......@@ -1074,11 +1127,6 @@ int ha_tokudb::open_secondary_table(DB** ptr, KEY* key_info, const char* name, i
goto cleanup;
}
make_name(newname, name, dict_name);
fn_ret = fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME|MY_SAFE_PATH);
if (fn_ret == NULL) {
error = HA_ERR_INTERNAL_ERROR;
goto cleanup;
}
open_flags += DB_AUTO_COMMIT;
......@@ -1090,8 +1138,8 @@ int ha_tokudb::open_secondary_table(DB** ptr, KEY* key_info, const char* name, i
// TODO: make sure that with clustering keys, DB_YESOVERWRITE IS ALWAYS SET
//
*key_type = key_info->flags & HA_NOSAME ? DB_NOOVERWRITE : DB_YESOVERWRITE;
(*ptr)->set_bt_compare(*ptr, tokudb_cmp_dbt_key);
(*ptr)->set_bt_compare(*ptr, tokudb_cmp_dbt_key);
DBUG_PRINT("info", ("Setting DB_DUP+DB_DUPSORT for key %s\n", key_info->name));
//
// clustering keys are not DB_DUP, because their keys are unique (they have the PK embedded)
......@@ -1101,7 +1149,7 @@ int ha_tokudb::open_secondary_table(DB** ptr, KEY* key_info, const char* name, i
(*ptr)->set_dup_compare(*ptr, tokudb_cmp_dbt_data);
}
if ((error = (*ptr)->open(*ptr, 0, name_buff, NULL, DB_BTREE, open_flags, 0))) {
if ((error = (*ptr)->open(*ptr, txn, newname, NULL, DB_BTREE, open_flags, 0))) {
my_errno = error;
goto cleanup;
}
......@@ -1109,6 +1157,12 @@ int ha_tokudb::open_secondary_table(DB** ptr, KEY* key_info, const char* name, i
TOKUDB_TRACE("open:%s:file=%p\n", newname, *ptr);
}
cleanup:
if (error) {
if (*ptr) {
(*ptr)->close(*ptr, 0);
*ptr = NULL;
}
}
my_free(newname, MYF(MY_ALLOW_ZERO_PTR));
return error;
}
......@@ -1175,15 +1229,10 @@ int ha_tokudb::initialize_share(
)
{
int error = 0;
char* newname = NULL;
char name_buff[FN_REFLEN];
char* fn_ret = NULL;
u_int64_t num_rows = 0;
u_int32_t curr_blob_field_index = 0;
u_int32_t max_var_bytes = 0;
bool table_exists;
uint open_flags = (mode == O_RDONLY ? DB_RDONLY : 0) | DB_THREAD;
open_flags += DB_AUTO_COMMIT;
DBUG_PRINT("info", ("share->use_count %u", share->use_count));
table_exists = true;
......@@ -1198,22 +1247,6 @@ int ha_tokudb::initialize_share(
goto exit;
}
newname = (char *)my_malloc(
get_max_dict_name_path_length(name),
MYF(MY_WME|MY_ZEROFILL)
);
if (newname == NULL) {
error = ENOMEM;
goto exit;
}
make_name(newname, name, "main");
fn_ret = fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME|MY_SAFE_PATH);
if (fn_ret == NULL) {
error = HA_ERR_INTERNAL_ERROR;
goto exit;
}
//
// fill in the field lengths. 0 means it is a variable sized field length
// fill in length_bytes, 0 means it is fixed or blob
......@@ -1306,37 +1339,19 @@ int ha_tokudb::initialize_share(
}
error = db_create(&share->file, db_env, 0);
if (error) {
goto exit;
}
share->key_file[primary_key] = share->file;
share->key_type[primary_key] = hidden_primary_key ? DB_YESOVERWRITE : DB_NOOVERWRITE;
//
// set comparison function for main.tokudb
//
share->file->set_bt_compare(share->file, tokudb_cmp_dbt_key);
error = share->file->open(share->file, 0, name_buff, NULL, DB_BTREE, open_flags, 0);
if (error) {
goto exit;
}
error = open_main_dictionary(name, mode, NULL);
if (error) { goto exit; }
if (tokudb_debug & TOKUDB_DEBUG_OPEN) {
TOKUDB_TRACE("open:%s:file=%p\n", newname, share->file);
}
/* Open other keys; These are part of the share structure */
for (uint i = 0; i < table_share->keys; i++) {
if (i != primary_key) {
error = open_secondary_table(
error = open_secondary_dictionary(
&share->key_file[i],
&table_share->key_info[i],
name,
mode,
&share->key_type[i]
&share->key_type[i],
NULL
);
if (error) {
goto exit;
......@@ -1394,7 +1409,6 @@ int ha_tokudb::initialize_share(
error = 0;
exit:
my_free(newname, MYF(MY_ALLOW_ZERO_PTR));
return error;
}
......@@ -1574,31 +1588,87 @@ cleanup:
crsr = NULL;
}
if (do_commit) {
transaction->commit(transaction, 0);
commit_txn(transaction, 0);
transaction = NULL;
}
return error;
}
int ha_tokudb::write_to_status(DB* db, HA_METADATA_KEY curr_key_data, void* data, uint size ){
return write_metadata(db, &curr_key_data, sizeof(curr_key_data), data, size);
int ha_tokudb::write_to_status(DB* db, HA_METADATA_KEY curr_key_data, void* data, uint size, DB_TXN* txn ){
return write_metadata(db, &curr_key_data, sizeof(curr_key_data), data, size, txn);
}
int ha_tokudb::remove_metadata(DB* db, void* key_data, uint key_size, DB_TXN* transaction){
int error;
DBT key;
DB_TXN* txn = NULL;
bool do_commit = false;
//
// transaction to be used for putting metadata into status.tokudb
//
error = db_env->txn_begin(db_env, 0, &txn, 0);
if (error) {
goto cleanup;
}
//
// transaction to be used for putting metadata into status.tokudb
//
if (transaction == NULL) {
error = db_env->txn_begin(db_env, 0, &txn, 0);
if (error) {
goto cleanup;
}
do_commit = true;
}
else {
txn = transaction;
}
bzero(&key, sizeof(key));
key.data = key_data;
key.size = key_size;
error = db->del(db, txn, &key, DB_DELETE_ANY);
if (error) {
goto cleanup;
}
error = 0;
cleanup:
if (do_commit && txn) {
if (!error) {
commit_txn(txn, DB_TXN_NOSYNC);
}
else {
abort_txn(txn);
}
}
return error;
}
//
// helper function to write a piece of metadata in to status.tokudb
//
int ha_tokudb::write_metadata(DB* db, void* key_data, uint key_size, void* val_data, uint val_size ){
int ha_tokudb::write_metadata(DB* db, void* key_data, uint key_size, void* val_data, uint val_size, DB_TXN* transaction ){
int error;
DBT key;
DBT value;
DB_TXN* txn = NULL;
bool do_commit = false;
//
// transaction to be used for putting metadata into status.tokudb
//
error = db_env->txn_begin(db_env, 0, &txn, 0);
if (error) {
goto cleanup;
if (transaction == NULL) {
error = db_env->txn_begin(db_env, 0, &txn, 0);
if (error) {
goto cleanup;
}
do_commit = true;
}
else {
txn = transaction;
}
bzero(&key, sizeof(key));
......@@ -1614,12 +1684,12 @@ int ha_tokudb::write_metadata(DB* db, void* key_data, uint key_size, void* val_d
error = 0;
cleanup:
if (txn) {
if (do_commit && txn) {
if (!error) {
txn->commit(txn, DB_TXN_NOSYNC);
commit_txn(txn, DB_TXN_NOSYNC);
}
else {
txn->abort(txn);
abort_txn(txn);
}
}
return error;
......@@ -1637,7 +1707,7 @@ cleanup:
//
//
int ha_tokudb::update_max_auto_inc(DB* db, ulonglong val){
return write_to_status(db,hatoku_max_ai,&val,sizeof(val));
return write_to_status(db,hatoku_max_ai,&val,sizeof(val), NULL);
}
//
......@@ -1651,8 +1721,8 @@ int ha_tokudb::update_max_auto_inc(DB* db, ulonglong val){
// 0 on success, error otherwise
//
//
int ha_tokudb::write_auto_inc_create(DB* db, ulonglong val){
return write_to_status(db,hatoku_ai_create_value,&val,sizeof(val));
int ha_tokudb::write_auto_inc_create(DB* db, ulonglong val, DB_TXN* txn){
return write_to_status(db,hatoku_ai_create_value,&val,sizeof(val), txn);
}
......@@ -2360,8 +2430,7 @@ int ha_tokudb::read_last(uint keynr) {
error = index_last(table->record[1]);
index_end();
if (do_commit) {
int r = transaction->commit(transaction, 0);
assert(r == 0);
commit_txn(transaction, 0);
transaction = NULL;
}
TOKUDB_DBUG_RETURN(error);
......@@ -2398,36 +2467,15 @@ int ha_tokudb::get_status() {
DBT key, value;
HA_METADATA_KEY curr_key;
int error;
char* newname = NULL;
char* fn_ret = NULL;
//
// open status.tokudb
//
if (!share->status_block) {
char name_buff[FN_REFLEN];
newname = (char *)my_malloc(
get_max_dict_name_path_length(share->table_name),
MYF(MY_WME)
error = open_status_dictionary(
&share->status_block,
share->table_name,
NULL
);
if (newname == NULL) {
error = ENOMEM;
goto cleanup;
}
make_name(newname, share->table_name, "status");
fn_ret = fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME|MY_SAFE_PATH);
if (fn_ret == NULL) {
error = HA_ERR_INTERNAL_ERROR;
goto cleanup;
}
uint open_mode = (((table->db_stat & HA_READ_ONLY) ? DB_RDONLY : 0)
| DB_THREAD);
if (tokudb_debug & TOKUDB_DEBUG_OPEN) {
TOKUDB_TRACE("open:%s\n", newname);
}
error = db_create(&share->status_block, db_env, 0);
if (error) { goto cleanup; }
error = share->status_block->open(share->status_block, NULL, name_buff, NULL, DB_BTREE, open_mode, 0);
if (error) {
goto cleanup;
}
......@@ -2444,64 +2492,56 @@ int ha_tokudb::get_status() {
error = db_env->txn_begin(db_env, 0, &txn, 0);
if (error) { goto cleanup; }
if (share->status_block) {
int error;
//
// get version
//
value.ulen = sizeof(share->version);
value.data = &share->version;
curr_key = hatoku_version;
error = share->status_block->get(
share->status_block,
txn,
&key,
&value,
0
);
if (error == DB_NOTFOUND) {
share->version = 0;
}
else if (error || value.size != sizeof(share->version)) {
if (error == 0) {
error = HA_ERR_INTERNAL_ERROR;
}
goto cleanup;
}
//
// get capabilities
//
curr_key = hatoku_capabilities;
value.ulen = sizeof(share->capabilities);
value.data = &share->capabilities;
error = share->status_block->get(
share->status_block,
txn,
&key,
&value,
0
);
if (error == DB_NOTFOUND) {
share->capabilities= 0;
assert(share->status_block);
//
// get version
//
value.ulen = sizeof(share->version);
value.data = &share->version;
curr_key = hatoku_version;
error = share->status_block->get(
share->status_block,
txn,
&key,
&value,
0
);
if (error == DB_NOTFOUND) {
share->version = 0;
}
else if (error || value.size != sizeof(share->version)) {
if (error == 0) {
error = HA_ERR_INTERNAL_ERROR;
}
else if (error || value.size != sizeof(share->version)) {
if (error == 0) {
error = HA_ERR_INTERNAL_ERROR;
}
goto cleanup;
goto cleanup;
}
//
// get capabilities
//
curr_key = hatoku_capabilities;
value.ulen = sizeof(share->capabilities);
value.data = &share->capabilities;
error = share->status_block->get(
share->status_block,
txn,
&key,
&value,
0
);
if (error == DB_NOTFOUND) {
share->capabilities= 0;
}
else if (error || value.size != sizeof(share->version)) {
if (error == 0) {
error = HA_ERR_INTERNAL_ERROR;
}
goto cleanup;
}
error = 0;
cleanup:
if (txn) {
txn->commit(txn,0);
}
my_free(newname, MYF(MY_ALLOW_ZERO_PTR));
if (error) {
if (share->status_block) {
share->status_block->close(share->status_block, 0);
share->status_block = NULL;
}
commit_txn(txn,0);
}
TOKUDB_DBUG_RETURN(error);
}
......@@ -2598,8 +2638,7 @@ cleanup:
tmp_cursor = NULL;
}
if (txn) {
int r = txn->commit(txn, 0);
assert(r == 0);
commit_txn(txn, 0);
txn = NULL;
}
return ret_val;
......@@ -2666,13 +2705,6 @@ int ha_tokudb::write_row(uchar * record) {
is_replace_into = (thd_sql_command(thd) == SQLCOM_REPLACE) ||
(thd_sql_command(thd) == SQLCOM_REPLACE_SELECT);
//
// this can only fail if we have not opened the environment
// yet. I want to assert that rather than check for the error
//
error = db_env->checkpointing_begin_atomic_operation(db_env);
assert(!error);
//
// some crap that needs to be done because MySQL does not properly abstract
// this work away from us, namely filling in auto increment and setting auto timestamp
......@@ -2835,16 +2867,12 @@ cleanup:
// nothing we can do about it anyway and it is not what
// we want to return.
if (error) {
sub_trans->abort(sub_trans);
abort_txn(sub_trans);
}
else {
error = sub_trans->commit(sub_trans, DB_TXN_NOSYNC);
commit_txn(sub_trans, DB_TXN_NOSYNC);
}
}
{
int r = db_env->checkpointing_end_atomic_operation(db_env);
assert(r==0);
}
TOKUDB_DBUG_RETURN(error);
}
......@@ -2930,13 +2958,6 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
DB_TXN* txn = NULL;
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);;
//
// this can only fail if we have not opened the environment
// yet. I want to assert that rather than check for the error
//
error = db_env->checkpointing_begin_atomic_operation(db_env);
assert(!error);
LINT_INIT(error);
statistic_increment(table->in_use->status_var.ha_update_count, &LOCK_status);
if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE) {
......@@ -3078,16 +3099,12 @@ cleanup:
// nothing we can do about it anyway and it is not what
// we want to return.
if (error) {
sub_trans->abort(sub_trans);
abort_txn(sub_trans);
}
else {
error = sub_trans->commit(sub_trans, DB_TXN_NOSYNC);
commit_txn(sub_trans, DB_TXN_NOSYNC);
}
}
{
int r = db_env->checkpointing_end_atomic_operation(db_env);
assert(r==0);
}
TOKUDB_DBUG_RETURN(error);
}
......@@ -3180,13 +3197,6 @@ int ha_tokudb::delete_row(const uchar * record) {
THD* thd = ha_thd();
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);;
//
// this can only fail if we have not opened the environment
// yet. I want to assert that rather than check for the error
//
error = db_env->checkpointing_begin_atomic_operation(db_env);
assert(!error);
statistic_increment(table->in_use->status_var.ha_delete_count, &LOCK_status);
create_dbt_key_from_table(&prim_key, primary_key, key_buff, record, &has_null);
......@@ -3205,10 +3215,6 @@ int ha_tokudb::delete_row(const uchar * record) {
trx->stmt_progress.deleted++;
track_progress(thd);
}
{
int r = db_env->checkpointing_end_atomic_operation(db_env);
assert(r==0);
}
TOKUDB_DBUG_RETURN(error);
}
......@@ -4303,7 +4309,7 @@ int ha_tokudb::info(uint flag) {
error = 0;
cleanup:
if (txn != NULL) {
txn->commit(txn, DB_TXN_NOSYNC);
commit_txn(txn, DB_TXN_NOSYNC);
txn = NULL;
}
TOKUDB_DBUG_RETURN(error);
......@@ -4521,7 +4527,7 @@ int ha_tokudb::external_lock(THD * thd, int lock_type) {
We must in this case commit the work to keep the row locks
*/
DBUG_PRINT("trans", ("commiting non-updating transaction"));
error = trx->stmt->commit(trx->stmt, 0);
commit_txn(trx->stmt, 0);
reset_stmt_progress(&trx->stmt_progress);
if (tokudb_debug & TOKUDB_DEBUG_TXN)
TOKUDB_TRACE("commit:%p:%d\n", trx->stmt, error);
......@@ -4640,7 +4646,7 @@ int toku_dbt_up(DB*,
}
static int create_sub_table(const char *table_name, int flags , DBT* row_descriptor) {
static int create_sub_table(const char *table_name, int flags , DBT* row_descriptor, DB_TXN* txn) {
TOKUDB_DBUG_ENTER("create_sub_table");
int error;
DB *file = NULL;
......@@ -4662,157 +4668,20 @@ static int create_sub_table(const char *table_name, int flags , DBT* row_descrip
goto exit;
}
error = file->open(file, NULL, table_name, NULL, DB_BTREE, DB_THREAD | DB_CREATE, my_umask);
error = file->open(file, txn, table_name, NULL, DB_BTREE, DB_THREAD | DB_CREATE, my_umask);
if (error) {
DBUG_PRINT("error", ("Got error: %d when opening table '%s'", error, table_name));
goto exit;
}
file->close(file, 0);
error = 0;
exit:
if (error) {
if (file != NULL) {
(void) file->remove(file, table_name, NULL, 0);
}
if (file) {
file->close(file, 0);
}
TOKUDB_DBUG_RETURN(error);
}
static int mkdirpath(char *name, mode_t mode) {
char* parent = NULL;
int r = toku_os_mkdir(name, mode);
if (r == -1 && errno == ENOENT) {
parent = (char *)my_malloc(strlen(name)+1,MYF(MY_WME));
if (parent == NULL) {
r = ENOMEM;
goto cleanup;
}
strcpy(parent, name);
char *cp = strrchr(parent, '/');
if (cp) {
*cp = 0;
r = toku_os_mkdir(parent, 0755);
if (r == 0)
r = toku_os_mkdir(name, mode);
}
}
cleanup:
my_free(parent, MYF(MY_ALLOW_ZERO_PTR));
return r;
}
extern "C" {
#include <dirent.h>
}
static int rmall(const char *dname) {
int error = 0;
char* fname = NULL;
struct dirent *dirent = NULL;;
DIR *d = opendir(dname);
if (d == NULL) {
error = errno;
goto cleanup;
}
//
// we do two loops, first loop just removes all the .tokudb files
// second loop removes extraneous files
//
while ((dirent = readdir(d)) != 0) {
if (0 == strcmp(dirent->d_name, ".") || 0 == strcmp(dirent->d_name, ".."))
continue;
fname = (char *)my_malloc(strlen(dname) + 1 + strlen(dirent->d_name) + 1, MYF(MY_WME));
sprintf(fname, "%s/%s", dname, dirent->d_name);
if (dirent->d_type == DT_DIR) {
error = rmall(fname);
if (error) { goto cleanup; }
}
else {
//
// if clause checks if the file is a .tokudb file
//
if (strlen(fname) >= strlen (ha_tokudb_ext) &&
strcmp(fname + (strlen(fname) - strlen(ha_tokudb_ext)), ha_tokudb_ext) == 0)
{
if (tokudb_debug & TOKUDB_DEBUG_OPEN) {
TOKUDB_TRACE("removing:%s\n", fname);
}
//
// if this fails under low memory conditions, gracefully exit and return error
// user will be notified that something went wrong, and he will
// have to deal with it
//
DB* db = NULL;
error = db_create(&db, db_env, 0);
if (error) { goto cleanup; }
//
// it is ok to do db->remove on any .tokudb file, because any such
// file was created with db->open
//
error = db->remove(db, fname, NULL, 0);
if (error) { goto cleanup; }
}
else {
continue;
}
my_free(fname, MYF(MY_ALLOW_ZERO_PTR));
fname = NULL;
}
}
closedir(d);
d = NULL;
fname = NULL;
d = opendir(dname);
if (d == NULL) {
error = errno;
goto cleanup;
}
//
// second loop to remove extraneous files
//
while ((dirent = readdir(d)) != 0) {
if (0 == strcmp(dirent->d_name, ".") || 0 == strcmp(dirent->d_name, ".."))
continue;
fname = (char *)my_malloc(strlen(dname) + 1 + strlen(dirent->d_name) + 1, MYF(MY_WME));
sprintf(fname, "%s/%s", dname, dirent->d_name);
if (dirent->d_type == DT_DIR) {
error = rmall(fname);
if (error) { goto cleanup; }
}
else {
if (tokudb_debug & TOKUDB_DEBUG_OPEN) {
TOKUDB_TRACE("removing:%s\n", fname);
}
//
// Now we are removing files that are not .tokudb, we just delete it
//
error = unlink(fname);
if (error != 0) {
error = errno;
break;
}
my_free(fname, MYF(MY_ALLOW_ZERO_PTR));
fname = NULL;
}
}
closedir(d);
d = NULL;
error = rmdir(dname);
if (error != 0) {
error = errno;
goto cleanup;
}
cleanup:
return error;
}
void ha_tokudb::update_create_info(HA_CREATE_INFO* create_info) {
if (share->has_auto_inc) {
info(HA_STATUS_AUTO);
......@@ -4820,45 +4689,67 @@ void ha_tokudb::update_create_info(HA_CREATE_INFO* create_info) {
}
}
//
// Creates a new table
// Parameters:
// [in] name - table name
// [in] form - info on table, columns and indexes
// [in] create_info - more info on table, CURRENTLY UNUSED
// Returns:
// 0 on success
// error otherwise
// removes key name from status.tokudb.
// needed for when we are dropping indexes, so that
// during drop table, we do not attempt to remove already dropped
// indexes because we did not keep status.tokudb in sync with list of indexes.
//
int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_info) {
TOKUDB_DBUG_ENTER("ha_tokudb::create");
char name_buff[FN_REFLEN];
int ha_tokudb::remove_key_name_from_status(DB* status_block, char* key_name, DB_TXN* txn) {
int error;
DB *status_block = NULL;
bool dir_path_made = false;
char* dirname = NULL;
char* newname = NULL;
DBT row_descriptor;
uchar* row_desc_buff = NULL;
KEY* prim_key = NULL;
char* fn_ret = NULL;
uint version;
uint capabilities;
pthread_mutex_lock(&tokudb_meta_mutex);
bzero(&row_descriptor, sizeof(row_descriptor));
row_desc_buff = (uchar *)my_malloc(2*(form->s->fields * 6)+10 ,MYF(MY_WME));
if (row_desc_buff == NULL){ error = ENOMEM; goto cleanup;}
uchar status_key_info[FN_REFLEN + sizeof(HA_METADATA_KEY)];
HA_METADATA_KEY md_key = hatoku_key_name;
memcpy(status_key_info, &md_key, sizeof(HA_METADATA_KEY));
//
// put index name in status.tokudb
//
memcpy(
status_key_info + sizeof(HA_METADATA_KEY),
key_name,
strlen(key_name) + 1
);
error = remove_metadata(
status_block,
status_key_info,
sizeof(HA_METADATA_KEY) + strlen(key_name) + 1,
txn
);
return error;
}
dirname = (char *)my_malloc(get_max_dict_name_path_length(name),MYF(MY_WME));
if (dirname == NULL){ error = ENOMEM; goto cleanup;}
newname = (char *)my_malloc(get_max_dict_name_path_length(name),MYF(MY_WME));
if (newname == NULL){ error = ENOMEM; goto cleanup;}
primary_key = form->s->primary_key;
hidden_primary_key = (primary_key >= MAX_KEY) ? TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH : 0;
//
// writes the key name in status.tokudb, so that we may later delete or rename
// the dictionary associated with key_name
//
int ha_tokudb::write_key_name_to_status(DB* status_block, char* key_name, DB_TXN* txn) {
int error;
uchar status_key_info[FN_REFLEN + sizeof(HA_METADATA_KEY)];
HA_METADATA_KEY md_key = hatoku_key_name;
memcpy(status_key_info, &md_key, sizeof(HA_METADATA_KEY));
//
// put index name in status.tokudb
//
memcpy(
status_key_info + sizeof(HA_METADATA_KEY),
key_name,
strlen(key_name) + 1
);
error = write_metadata(
status_block,
status_key_info,
sizeof(HA_METADATA_KEY) + strlen(key_name) + 1,
NULL,
0,
txn
);
return error;
}
uint i;
//
// some tracing moved out of ha_tokudb::create, because ::create was getting cluttered
//
void ha_tokudb::trace_create_table_info(const char *name, TABLE * form) {
uint i;
//
// tracing information about what type of table we are creating
//
......@@ -4879,32 +4770,84 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in
}
}
}
}
//
// creates dictionary for secondary index, with key description key_info, all using txn
//
int ha_tokudb::create_secondary_dictionary(const char* name, TABLE* form, KEY* key_info, DB_TXN* txn) {
int error;
DBT row_descriptor;
uchar* row_desc_buff = NULL;
char* newname = NULL;
KEY* prim_key = NULL;
char dict_name[MAX_DICT_NAME_LEN];
uint hpk= (form->s->primary_key >= MAX_KEY) ? TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH : 0;
int flags = (key_info->flags & HA_CLUSTERING) ? 0 : DB_DUP + DB_DUPSORT;
// a table is a directory of dictionaries
make_name(dirname, name, 0);
error = mkdirpath(dirname, 0777);
if (error != 0) {
error = errno;
goto cleanup;
}
dir_path_made = true;
bzero(&row_descriptor, sizeof(row_descriptor));
row_desc_buff = (uchar *)my_malloc(2*(form->s->fields * 6)+10 ,MYF(MY_WME));
if (row_desc_buff == NULL){ error = ENOMEM; goto cleanup;}
newname = (char *)my_malloc(get_max_dict_name_path_length(name),MYF(MY_WME));
if (newname == NULL){ error = ENOMEM; goto cleanup;}
sprintf(dict_name, "key-%s", key_info->name);
make_name(newname, name, dict_name);
prim_key = (hpk) ? NULL : &form->s->key_info[primary_key];
//
// setup the row descriptor
//
row_descriptor.data = row_desc_buff;
row_descriptor.size = create_toku_key_descriptor(
row_desc_buff,
false,
key_info->flags & HA_CLUSTERING,
key_info,
hpk,
prim_key
);
error = create_sub_table(newname, flags, &row_descriptor, txn);
cleanup:
my_free(newname, MYF(MY_ALLOW_ZERO_PTR));
my_free(row_desc_buff, MYF(MY_ALLOW_ZERO_PTR));
return error;
}
//
// create and close the main dictionarr with name of "name" using table form, all within
// transaction txn.
//
int ha_tokudb::create_main_dictionary(const char* name, TABLE* form, DB_TXN* txn) {
int error;
DBT row_descriptor;
uchar* row_desc_buff = NULL;
char* newname = NULL;
KEY* prim_key = NULL;
uint hpk= (form->s->primary_key >= MAX_KEY) ? TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH : 0;
bzero(&row_descriptor, sizeof(row_descriptor));
row_desc_buff = (uchar *)my_malloc(2*(form->s->fields * 6)+10 ,MYF(MY_WME));
if (row_desc_buff == NULL){ error = ENOMEM; goto cleanup;}
newname = (char *)my_malloc(get_max_dict_name_path_length(name),MYF(MY_WME));
if (newname == NULL){ error = ENOMEM; goto cleanup;}
make_name(newname, name, "main");
fn_ret = fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME|MY_SAFE_PATH);
if (fn_ret == NULL) {
error = HA_ERR_INTERNAL_ERROR;
goto cleanup;
}
prim_key = (hpk) ? NULL : &form->s->key_info[primary_key];
//
// setup the row descriptor
//
prim_key = (hidden_primary_key) ? NULL : &form->s->key_info[primary_key];
row_descriptor.data = row_desc_buff;
row_descriptor.size = create_toku_key_descriptor(
row_desc_buff,
hidden_primary_key,
hpk,
false,
prim_key,
false,
......@@ -4912,96 +4855,106 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in
);
/* Create the main table that will hold the real rows */
error = create_sub_table(name_buff, 0, &row_descriptor);
if (tokudb_debug & TOKUDB_DEBUG_OPEN) {
TOKUDB_TRACE("create:%s:error=%d\n", newname, error);
}
if (error) {
goto cleanup;
}
error = create_sub_table(newname, 0, &row_descriptor, txn);
cleanup:
my_free(newname, MYF(MY_ALLOW_ZERO_PTR));
my_free(row_desc_buff, MYF(MY_ALLOW_ZERO_PTR));
return error;
}
//
// Creates a new table
// Parameters:
// [in] name - table name
// [in] form - info on table, columns and indexes
// [in] create_info - more info on table, CURRENTLY UNUSED
// Returns:
// 0 on success
// error otherwise
//
int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_info) {
TOKUDB_DBUG_ENTER("ha_tokudb::create");
int error;
DB *status_block = NULL;
KEY* prim_key = NULL;
uint version;
uint capabilities;
DB_TXN* txn = NULL;
char* newname = NULL;
pthread_mutex_lock(&tokudb_meta_mutex);
/* Create the keys */
char dict_name[MAX_DICT_NAME_LEN];
for (uint i = 0; i < form->s->keys; i++) {
if (i != primary_key) {
int flags = (form->s->key_info[i].flags & HA_CLUSTERING) ? 0 : DB_DUP + DB_DUPSORT;
sprintf(dict_name, "key-%s", form->s->key_info[i].name);
make_name(newname, name, dict_name);
fn_ret = fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME|MY_SAFE_PATH);
if (fn_ret == NULL) {
error = HA_ERR_INTERNAL_ERROR;
goto cleanup;
}
//
// setup the row descriptor
//
row_descriptor.size = create_toku_key_descriptor(
row_desc_buff,
false,
form->key_info[i].flags & HA_CLUSTERING,
&form->key_info[i],
hidden_primary_key,
prim_key
);
error = create_sub_table(name_buff, flags, &row_descriptor);
if (tokudb_debug & TOKUDB_DEBUG_OPEN) {
TOKUDB_TRACE("create:%s:flags=%ld:error=%d\n", newname, form->key_info[i].flags, error);
}
if (error) {
goto cleanup;
}
}
}
newname = (char *)my_malloc(get_max_dict_name_path_length(name),MYF(MY_WME));
if (newname == NULL){ error = ENOMEM; goto cleanup;}
error = db_create(&status_block, db_env, 0);
error = db_env->txn_begin(db_env, 0, &txn, 0);
if (error) { goto cleanup; }
primary_key = form->s->primary_key;
hidden_primary_key = (primary_key >= MAX_KEY) ? TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH : 0;
/* do some tracing */
trace_create_table_info(name,form);
/* Create status.tokudb and save relevant metadata */
make_name(newname, name, "status");
fn_ret = fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME|MY_SAFE_PATH);
if (fn_ret == NULL) {
error = HA_ERR_INTERNAL_ERROR;
goto cleanup;
}
error = status_block->open(status_block, NULL, name_buff, NULL, DB_BTREE, DB_CREATE, 0);
error = db_create(&status_block, db_env, 0);
if (error) { goto cleanup; }
status_block->set_bt_compare(status_block, tokudb_cmp_dbt_key);
error = status_block->open(status_block, txn, newname, NULL, DB_BTREE, DB_CREATE, 0);
if (error) { goto cleanup; }
version = HA_TOKU_VERSION;
capabilities = HA_TOKU_CAP;
error = write_to_status(status_block, hatoku_version,&version,sizeof(version));
error = write_to_status(status_block, hatoku_version,&version,sizeof(version), txn);
if (error) { goto cleanup; }
error = write_to_status(status_block, hatoku_capabilities,&capabilities,sizeof(capabilities));
error = write_to_status(status_block, hatoku_capabilities,&capabilities,sizeof(capabilities), txn);
if (error) { goto cleanup; }
error = write_auto_inc_create(status_block, create_info->auto_increment_value);
error = write_auto_inc_create(status_block, create_info->auto_increment_value, txn);
if (error) { goto cleanup; }
error = add_table_to_metadata(name, form);
error = create_main_dictionary(name, form, txn);
if (error) {
goto cleanup;
}
for (uint i = 0; i < form->s->keys; i++) {
if (i != primary_key) {
error = create_secondary_dictionary(name, form, &form->key_info[i], txn);
if (error) {
goto cleanup;
}
error = write_key_name_to_status(status_block, form->s->key_info[i].name, txn);
if (error) { goto cleanup; }
}
}
error = add_table_to_metadata(name, form, txn);
if (error) { goto cleanup; }
error = 0;
cleanup:
if (status_block != NULL) {
status_block->close(status_block, 0);
}
if (error && dir_path_made) {
rmall(dirname);
}
if (error) {
drop_table_from_metadata(name);
if (txn) {
if (error) {
abort_txn(txn);
}
else {
commit_txn(txn,0);
}
}
my_free(newname, MYF(MY_ALLOW_ZERO_PTR));
my_free(dirname, MYF(MY_ALLOW_ZERO_PTR));
my_free(row_desc_buff, MYF(MY_ALLOW_ZERO_PTR));
pthread_mutex_unlock(&tokudb_meta_mutex);
TOKUDB_DBUG_RETURN(error);
}
......@@ -5020,49 +4973,181 @@ int ha_tokudb::discard_or_import_tablespace(my_bool discard) {
//
// Drops table
// Parameters:
// [in] name - name of table to be deleted
// Returns:
// 0 on success
// error otherwise
// deletes from_name or renames from_name to to_name, all using transaction txn.
// is_delete specifies which we are doing
// is_key specifies if it is a secondary index (and hence a "key-" needs to be prepended) or
// if it is not a secondary index
//
int ha_tokudb::delete_table(const char *name) {
TOKUDB_DBUG_ENTER("ha_tokudb::delete_table");
int ha_tokudb::delete_or_rename_dictionary( const char* from_name, const char* to_name, char* secondary_name, bool is_key, DB_TXN* txn, bool is_delete) {
int error;
char dict_name[MAX_DICT_NAME_LEN];
char* new_from_name = NULL;
char* new_to_name = NULL;
assert(txn);
new_from_name = (char *)my_malloc(
get_max_dict_name_path_length(from_name),
MYF(MY_WME)
);
if (new_from_name == NULL) {
error = ENOMEM;
goto cleanup;
}
if (!is_delete) {
assert(to_name);
new_to_name = (char *)my_malloc(
get_max_dict_name_path_length(to_name),
MYF(MY_WME)
);
if (new_to_name == NULL) {
error = ENOMEM;
goto cleanup;
}
}
if (is_key) {
sprintf(dict_name, "key-%s", secondary_name);
make_name(new_from_name, from_name, dict_name);
}
else {
make_name(new_from_name, from_name, secondary_name);
}
if (!is_delete) {
if (is_key) {
sprintf(dict_name, "key-%s", secondary_name);
make_name(new_to_name, to_name, dict_name);
}
else {
make_name(new_to_name, to_name, secondary_name);
}
}
if (is_delete) {
error = db_env->dbremove(db_env, txn, new_from_name, NULL, 0);
}
else {
error = db_env->dbrename(db_env, txn, new_from_name, NULL, new_to_name, 0);
}
if (error) { goto cleanup; }
cleanup:
my_free(new_from_name, MYF(MY_ALLOW_ZERO_PTR));
my_free(new_to_name, MYF(MY_ALLOW_ZERO_PTR));
return error;
}
//
// deletes or renames a table. if is_delete is true, then we delete, and to_name can be NULL
// if is_delete is false, then to_name must be non-NULL, as we are renaming the table.
//
int ha_tokudb::delete_or_rename_table (const char* from_name, const char* to_name, bool is_delete) {
int error;
char* newname = NULL;
DB* status_db;
DB* db_to_delete = NULL;
DBC* status_cursor = NULL;
DB_TXN* txn = NULL;
DBT curr_key;
DBT curr_val;
bzero(&curr_key, sizeof(curr_key));
bzero(&curr_val, sizeof(curr_val));
pthread_mutex_lock(&tokudb_meta_mutex);
error = db_env->txn_begin(db_env, 0, &txn, 0);
if (error) { goto cleanup; }
//
// this can only fail if we have not opened the environment
// yet. I want to assert that rather than check for the error
// modify metadata db
//
error = db_env->checkpointing_postpone(db_env);
assert(!error);
// remove all of the dictionaries in the table directory
error = drop_table_from_metadata(name);
if (error) {
goto cleanup;
if (is_delete) {
error = drop_table_from_metadata(from_name, txn);
}
newname = (char *)my_malloc(get_max_dict_name_path_length(name), MYF(MY_WME|MY_ZEROFILL));
if (newname == NULL) {
error = ENOMEM;
goto cleanup;
else {
error = rename_table_in_metadata(from_name, to_name, txn);
}
make_name(newname, name, 0);
error = rmall(newname);
if (error) { goto cleanup; }
//
// open status db,
// create cursor,
// for each name read out of there, create a db and delete or rename it
//
error = open_status_dictionary(&status_db, from_name, txn);
if (error) { goto cleanup; }
error = status_db->cursor(status_db, txn, &status_cursor, 0);
if (error) { goto cleanup; }
while (error != DB_NOTFOUND) {
error = status_cursor->c_get(
status_cursor,
&curr_key,
&curr_val,
DB_NEXT
);
if (error && error != DB_NOTFOUND) { goto cleanup; }
if (error == DB_NOTFOUND) { break; }
HA_METADATA_KEY mk = *(HA_METADATA_KEY *)curr_key.data;
if (mk != hatoku_key_name) {
continue;
}
error = delete_or_rename_dictionary(from_name, to_name, (char *)((char *)curr_key.data + sizeof(HA_METADATA_KEY)), true, txn, is_delete);
if (error) { goto cleanup; }
}
//
// delete or rename main.tokudb
//
error = delete_or_rename_dictionary(from_name, to_name, "main", false, txn, is_delete);
if (error) { goto cleanup; }
error = status_cursor->c_close(status_cursor);
if (error) { goto cleanup; }
status_cursor = NULL;
error = status_db->close(status_db, 0);
if (error) { goto cleanup; }
status_db = NULL;
//
// delete or rename status.tokudb
//
error = delete_or_rename_dictionary(from_name, to_name, "status", false, txn, is_delete);
if (error) { goto cleanup; }
my_errno = error;
cleanup:
{
int r;
r = db_env->checkpointing_resume(db_env);
assert(r==0);
if (status_cursor) {
status_cursor->c_close(status_cursor);
}
if (status_db) {
status_db->close(status_db, 0);
}
if (txn) {
if (error) {
abort_txn(txn);
}
else {
commit_txn(txn, 0);
}
}
my_free(newname, MYF(MY_ALLOW_ZERO_PTR));
pthread_mutex_unlock(&tokudb_meta_mutex);
TOKUDB_DBUG_RETURN(error);
return error;
}
//
// Drops table
// Parameters:
// [in] name - name of table to be deleted
// Returns:
// 0 on success
// error otherwise
//
int ha_tokudb::delete_table(const char *name) {
TOKUDB_DBUG_ENTER("ha_tokudb::delete_table");
TOKUDB_DBUG_RETURN(delete_or_rename_table(name, NULL, true));
}
......@@ -5078,50 +5163,7 @@ cleanup:
int ha_tokudb::rename_table(const char *from, const char *to) {
TOKUDB_DBUG_ENTER("%s %s %s", __FUNCTION__, from, to);
int error;
char* newfrom = NULL;
char* newto = NULL;
pthread_mutex_lock(&tokudb_meta_mutex);
//
// this can only fail if we have not opened the environment
// yet. I want to assert that rather than check for the error
//
error = db_env->checkpointing_postpone(db_env);
assert(!error);
int n = get_name_length(from) + NAME_CHAR_LEN;
newfrom = (char *)my_malloc(n,MYF(MY_WME));
if (newfrom == NULL){
error = ENOMEM;
goto cleanup;
}
make_name(newfrom, from, 0);
n = get_name_length(to) + NAME_CHAR_LEN;
newto = (char *)my_malloc(n,MYF(MY_WME));
if (newto == NULL){
error = ENOMEM;
goto cleanup;
}
make_name(newto, to, 0);
error = rename(newfrom, newto);
if (error != 0) {
error = my_errno = errno;
goto cleanup;
}
error = rename_table_in_metadata(from, to);
cleanup:
{
int r;
r = db_env->checkpointing_resume(db_env);
assert(r==0);
}
my_free(newfrom, MYF(MY_ALLOW_ZERO_PTR));
my_free(newto, MYF(MY_ALLOW_ZERO_PTR));
pthread_mutex_unlock(&tokudb_meta_mutex);
error = delete_or_rename_table(from, to, false);
TOKUDB_DBUG_RETURN(error);
}
......@@ -5377,7 +5419,7 @@ void ha_tokudb::init_auto_increment() {
share->auto_inc_create_value = 0;
}
txn->commit(txn,DB_TXN_NOSYNC);
commit_txn(txn, 0);
}
if (tokudb_debug & TOKUDB_DEBUG_AUTO_INCREMENT) {
TOKUDB_TRACE("init auto increment:%lld\n", share->last_auto_increment);
......@@ -5433,28 +5475,16 @@ bool ha_tokudb::is_auto_inc_singleton(){
//
int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
TOKUDB_DBUG_ENTER("ha_tokudb::add_index");
char name_buff[FN_REFLEN];
int error;
uint curr_index = 0;
DBC* tmp_cursor = NULL;
int cursor_ret_val = 0;
DBT current_primary_key;
DB_TXN* txn = NULL;
char* newname = NULL;
uint newname_len = 0;
uchar* tmp_key_buff = NULL;
uchar* tmp_prim_key_buff = NULL;
uchar* tmp_record = NULL;
THD* thd = ha_thd();
uchar* row_desc_buff = NULL;
DBT row_descriptor;
char* fn_ret = NULL;
bzero(&row_descriptor, sizeof(row_descriptor));
//
// these variables are for error handling
//
uint num_files_created = 0;
uint num_DB_opened = 0;
//
// number of DB files we have open currently, before add_index is executed
//
......@@ -5467,21 +5497,18 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
ulonglong num_processed = 0; //variable that stores number of elements inserted thus far
thd_proc_info(thd, "Adding indexes");
newname_len = get_max_dict_name_path_length(share->table_name);
newname = (char *)my_malloc(newname_len, MYF(MY_WME));
tmp_key_buff = (uchar *)my_malloc(2*table_arg->s->rec_buff_length, MYF(MY_WME));
tmp_prim_key_buff = (uchar *)my_malloc(2*table_arg->s->rec_buff_length, MYF(MY_WME));
tmp_record = table->record[0];
row_desc_buff = (uchar *)my_malloc(2*(table_share->fields * 6)+10 ,MYF(MY_WME));
if (newname == NULL ||
tmp_key_buff == NULL ||
tmp_prim_key_buff == NULL ||
tmp_record == NULL ||
row_desc_buff == NULL) {
if (tmp_key_buff == NULL ||
tmp_prim_key_buff == NULL ) {
error = ENOMEM;
goto cleanup;
}
error = db_env->txn_begin(db_env, 0, &txn, 0);
if (error) { goto cleanup; }
//
......@@ -5509,38 +5536,9 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
//
// first create all the DB's files
//
row_descriptor.data = row_desc_buff;
char dict_name[MAX_DICT_NAME_LEN];
for (uint i = 0; i < num_of_keys; i++) {
int flags = (key_info[i].flags & HA_CLUSTERING) ? 0 : DB_DUP + DB_DUPSORT;
sprintf(dict_name, "key-%s", key_info[i].name);
make_name(newname, share->table_name, dict_name);
fn_ret = fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME|MY_SAFE_PATH);
if (fn_ret == NULL) {
error = HA_ERR_INTERNAL_ERROR;
goto cleanup;
}
//
// setup the row descriptor
//
row_descriptor.size = create_toku_key_descriptor(
row_desc_buff,
false,
key_info[i].flags & HA_CLUSTERING,
&key_info[i],
hidden_primary_key,
hidden_primary_key ? NULL : &table_share->key_info[primary_key]
);
error = create_sub_table(name_buff, flags, &row_descriptor);
if (tokudb_debug & TOKUDB_DEBUG_OPEN) {
TOKUDB_TRACE("create:%s:flags=%ld:error=%d\n", newname, key_info[i].flags, error);
}
error = create_secondary_dictionary(share->table_name, table_arg, &key_info[i], txn);
if (error) { goto cleanup; }
num_files_created++;
}
for (uint i = 0; i < table_share->keys + test(hidden_primary_key); i++) {
}
......@@ -5573,25 +5571,18 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
}
error = open_secondary_table(
error = open_secondary_dictionary(
&share->key_file[curr_index],
&key_info[i],
share->table_name,
0,
&share->key_type[curr_index]
2, // TODO: This is a hack. Need to learn what should really be here. Need to ask Yoni
&share->key_type[curr_index],
txn
);
if (error) { goto cleanup; }
num_DB_opened++;
}
//
// scan primary table, create each secondary key, add to each DB
//
error = db_env->txn_begin(db_env, 0, &txn, 0);
if (error) { goto cleanup; }
//
// grab some locks to make this go faster
// first a global read lock on the main DB, because
......@@ -5620,6 +5611,9 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
if (error) { goto cleanup; }
}
//
// scan primary table, create each secondary key, add to each DB
//
if ((error = share->file->cursor(share->file, txn, &tmp_cursor, 0))) {
tmp_cursor = NULL; // Safety
goto cleanup;
......@@ -5727,48 +5721,42 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
tmp_cursor = NULL;
}
error = txn->commit(txn, 0);
txn = NULL;
assert(error == 0);
//
// now write stuff to status.tokudb
//
pthread_mutex_lock(&share->mutex);
for (uint i = 0; i < num_of_keys; i++) {
write_key_name_to_status(share->status_block, key_info[i].name, txn);
}
pthread_mutex_unlock(&share->mutex);
error = 0;
cleanup:
if (error) {
if (tmp_cursor) {
tmp_cursor->c_close(tmp_cursor);
tmp_cursor = NULL;
}
if (txn) {
//
// in the case of any error anywhere, we can just nuke all the files created, so we dont need
// to be tricky and try to roll back changes. That is why we commit the transaction,
// which should be fast. The DB is going to go away anyway, so no pt in trying to keep
// it in a good state.
//
txn->commit(txn, 0);
}
//
// We need to delete all the files that may have been created
// The DB's must be closed and removed
//
for (uint i = curr_num_DBs; i < curr_num_DBs + num_DB_opened; i++) {
share->key_file[i]->close(share->key_file[i], 0);
share->key_file[i] = NULL;
}
for (uint i = 0; i < num_files_created; i++) {
DB* tmp;
sprintf(dict_name, "key-%s", key_info[i].name);
make_name(newname, share->table_name, dict_name);
fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME|MY_SAFE_PATH);
if (!(db_create(&tmp, db_env, 0))) {
tmp->remove(tmp, name_buff, NULL, 0);
if (tmp_cursor) {
tmp_cursor->c_close(tmp_cursor);
tmp_cursor = NULL;
}
if (txn) {
if (error) {
curr_index = curr_num_DBs;
for (uint i = 0; i < num_of_keys; i++, curr_index++) {
if (share->key_file[curr_index]) {
share->key_file[curr_index]->close(
share->key_file[curr_index],
0
);
share->key_file[curr_index] = NULL;
}
}
abort_txn(txn);
}
else {
commit_txn(txn,0);
}
}
my_free(newname,MYF(MY_ALLOW_ZERO_PTR));
my_free(tmp_key_buff,MYF(MY_ALLOW_ZERO_PTR));
my_free(tmp_prim_key_buff,MYF(MY_ALLOW_ZERO_PTR));
my_free(row_desc_buff,MYF(MY_ALLOW_ZERO_PTR));
TOKUDB_DBUG_RETURN(error);
}
......@@ -5792,58 +5780,31 @@ cleanup:
int ha_tokudb::prepare_drop_index(TABLE *table_arg, uint *key_num, uint num_of_keys) {
TOKUDB_DBUG_ENTER("ha_tokudb::prepare_drop_index");
int error;
char name_buff[FN_REFLEN];
char* newname = NULL;
char* fn_ret = NULL;
char dict_name[MAX_DICT_NAME_LEN];
DB** dbs_to_remove = NULL;
DB_TXN* txn = NULL;
newname = (char *)my_malloc(
get_max_dict_name_path_length(share->table_name),
MYF(MY_WME|MY_ZEROFILL)
);
if (newname == NULL) {
error = ENOMEM;
goto cleanup;
}
//
// we allocate an array of DB's here to get ready for removal
// We do this so that all potential memory allocation errors that may occur
// will do so BEFORE we go about dropping any indexes. This way, we
// can fail gracefully without losing integrity of data in such cases. If on
// on the other hand, we started removing DB's, and in the middle,
// one failed, it is not immedietely obvious how one would rollback
//
dbs_to_remove = (DB **)my_malloc(sizeof(*dbs_to_remove)*num_of_keys, MYF(MY_ZEROFILL));
if (dbs_to_remove == NULL) {
error = ENOMEM;
goto cleanup;
}
for (uint i = 0; i < num_of_keys; i++) {
error = db_create(&dbs_to_remove[i], db_env, 0);
if (error) {
goto cleanup;
}
}
error = db_env->txn_begin(db_env, 0, &txn, 0);
if (error) { goto cleanup; }
for (uint i = 0; i < num_of_keys; i++) {
uint curr_index = key_num[i];
share->key_file[curr_index]->close(share->key_file[curr_index],0);
share->key_file[curr_index] = NULL;
sprintf(dict_name, "key-%s", table_arg->key_info[curr_index].name);
make_name(newname, share->table_name, dict_name);
fn_ret = fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME|MY_SAFE_PATH);
if (fn_ret == NULL) {
error = HA_ERR_INTERNAL_ERROR;
goto cleanup;
}
dbs_to_remove[i]->remove(dbs_to_remove[i], name_buff, NULL, 0);
error = remove_key_name_from_status(share->status_block, table_arg->key_info[curr_index].name, txn);
if (error) { goto cleanup; }
error = delete_or_rename_dictionary(share->table_name, NULL, table_arg->key_info[curr_index].name, true, txn, true);
if (error) { goto cleanup; }
}
cleanup:
my_free(dbs_to_remove, MYF(MY_ALLOW_ZERO_PTR));
my_free(newname, MYF(MY_ALLOW_ZERO_PTR));
if (txn) {
if (error) {
abort_txn(txn);
}
else {
commit_txn(txn,0);
}
}
TOKUDB_DBUG_RETURN(error);
}
......@@ -5979,11 +5940,66 @@ cleanup:
tmp_cursor = NULL;
}
if (do_commit) {
error = txn->commit(txn, 0);
commit_txn(txn, 0);
}
TOKUDB_DBUG_RETURN(error);
}
//
// truncate's dictionary associated with keynr index using transaction txn
// does so by deleting and then recreating the dictionary in the context
// of a transaction
//
int ha_tokudb::truncate_dictionary( uint keynr, DB_TXN* txn ) {
int error;
bool is_pk = (keynr == primary_key);
error = share->key_file[keynr]->close(share->key_file[keynr], 0);
if (error) { goto cleanup; }
share->key_file[keynr] = NULL;
if (is_pk) { share->file = NULL; }
if (is_pk) {
error = delete_or_rename_dictionary(
share->table_name,
NULL,
"main",
false, //is key
txn,
true // is a delete
);
if (error) { goto cleanup; }
}
else {
error = delete_or_rename_dictionary(
share->table_name,
NULL,
table_share->key_info[keynr].name,
true, //is key
txn,
true // is a delete
);
if (error) { goto cleanup; }
}
if (is_pk) {
error = create_main_dictionary(share->table_name, table, txn);
}
else {
error = create_secondary_dictionary(
share->table_name,
table,
&table_share->key_info[keynr],
txn
);
}
if (error) { goto cleanup; }
cleanup:
return error;
}
// delete all rows from a table
//
// effects: delete all of the rows in the main dictionary and all of the
......@@ -5997,41 +6013,64 @@ int ha_tokudb::delete_all_rows() {
TOKUDB_DBUG_ENTER("delete_all_rows");
int error = 0;
uint curr_num_DBs = 0;
DB_TXN* txn = NULL;
error = db_env->txn_begin(db_env, 0, &txn, 0);
if (error) { goto cleanup; }
if (thd_sql_command(ha_thd()) != SQLCOM_TRUNCATE) {
error = HA_ERR_WRONG_COMMAND;
goto cleanup;
}
//
// prelock so we know right away if there are any potential
// deadlocks
//
error = acquire_table_lock(transaction, lock_write);
if (error) {
goto cleanup;
}
// truncate all dictionaries
curr_num_DBs = table->s->keys + test(hidden_primary_key);
for (uint i = 0; i < curr_num_DBs; i++) {
DB *db = share->key_file[i];
u_int32_t row_count = 0;
error = db->truncate(db, transaction, &row_count, DB_TRUNCATE_WITHCURSORS);
if (error) {
break;
}
// do something with the row_count?
if (tokudb_debug) {
TOKUDB_TRACE("row_count=%u\n", row_count);
}
error = truncate_dictionary(i, txn);
if (error) { goto cleanup; }
}
// zap the row count
if (error == 0) {
share->rows = 0;
}
cleanup:
if (txn) {
if (error) {
abort_txn(txn);
}
else {
commit_txn(txn,0);
}
}
//
// regardless of errors, need to reopen the DB's
//
for (uint i = 0; i < curr_num_DBs; i++) {
int r = 0;
if (share->key_file[i] == NULL) {
if (i != primary_key) {
r = open_secondary_dictionary(
&share->key_file[i],
&table_share->key_info[i],
share->table_name,
2, // TODO: This is a hack. Need to learn what should really be here. Need to ask Yoni
&share->key_type[i],
NULL
);
assert(!r);
}
else {
r = open_main_dictionary(
share->table_name,
2, // TODO: This is a hack. Need to learn what should really be here. Need to ask Yoni
NULL
);
assert(!r);
}
}
}
TOKUDB_DBUG_RETURN(error);
}
......
......@@ -97,6 +97,7 @@ typedef ulonglong HA_METADATA_KEY;
#define hatoku_capabilities 1
#define hatoku_max_ai 2 //maximum auto increment value found so far
#define hatoku_ai_create_value 3
#define hatoku_key_name 4
typedef struct st_filter_key_part_info {
uint offset;
......@@ -258,14 +259,19 @@ private:
int handle_cursor_error(int error, int err_to_return, uint keynr);
DBT *get_pos(DBT * to, uchar * pos);
int open_secondary_table(DB** ptr, KEY* key_info, const char* name, int mode, u_int32_t* key_type);
int open_main_dictionary(const char* name, int mode, DB_TXN* txn);
int open_secondary_dictionary(DB** ptr, KEY* key_info, const char* name, int mode, u_int32_t* key_type, DB_TXN* txn);
int open_status_dictionary(DB** ptr, const char* name, DB_TXN* txn);
int acquire_table_lock (DB_TXN* trans, TABLE_LOCK_TYPE lt);
int estimate_num_rows(DB* db, u_int64_t* num_rows);
bool has_auto_increment_flag(uint* index);
int write_to_status(DB* db, HA_METADATA_KEY curr_key_data, void* data, uint size );
int write_metadata(DB* db, void* key, uint key_size, void* data, uint data_size );
int write_to_status(DB* db, HA_METADATA_KEY curr_key_data, void* data, uint size, DB_TXN* txn );
int write_metadata(DB* db, void* key, uint key_size, void* data, uint data_size, DB_TXN* txn );
int remove_metadata(DB* db, void* key_data, uint key_size, DB_TXN* transaction);
int update_max_auto_inc(DB* db, ulonglong val);
int write_auto_inc_create(DB* db, ulonglong val);
int remove_key_name_from_status(DB* status_block, char* key_name, DB_TXN* txn);
int write_key_name_to_status(DB* status_block, char* key_name, DB_TXN* txn);
int write_auto_inc_create(DB* db, ulonglong val, DB_TXN* txn);
void init_auto_increment();
int initialize_share(
const char* name,
......@@ -276,7 +282,12 @@ private:
int prelock_range ( const key_range *start_key, const key_range *end_key);
int create_txn(THD* thd, tokudb_trx_data* trx);
bool may_table_be_empty();
int delete_or_rename_table (const char* from_name, const char* to_name, bool is_delete);
int delete_or_rename_dictionary( const char* from_name, const char* to_name, char* index_name, bool is_key, DB_TXN* txn, bool is_delete);
int truncate_dictionary( uint keynr, DB_TXN* txn );
int create_secondary_dictionary(const char* name, TABLE* form, KEY* key_info, DB_TXN* txn);
int create_main_dictionary(const char* name, TABLE* form, DB_TXN* txn);
void trace_create_table_info(const char *name, TABLE * form);
public:
......
......@@ -114,11 +114,6 @@ static void reset_stmt_progress (tokudb_stmt_progress* val) {
static int get_name_length(const char *name) {
int n = 0;
const char *newname = name;
if (tokudb_data_dir) {
n += strlen(tokudb_data_dir) + 1;
if (strncmp("./", name, 2) == 0)
newname = name + 2;
}
n += strlen(newname);
n += strlen(ha_tokudb_ext);
return n;
......@@ -130,25 +125,37 @@ static int get_name_length(const char *name) {
static int get_max_dict_name_path_length(const char *tablename) {
int n = 0;
n += get_name_length(tablename);
n += 1; //for the '/'
n += 1; //for the '-'
n += MAX_DICT_NAME_LEN;
n += strlen(ha_tokudb_ext);
return n;
}
static void make_name(char *newname, const char *tablename, const char *dictname) {
const char *newtablename = tablename;
char *nn = newname;
if (tokudb_data_dir) {
nn += sprintf(nn, "%s/", tokudb_data_dir);
if (strncmp("./", tablename, 2) == 0)
newtablename = tablename + 2;
assert(tablename);
assert(dictname);
nn += sprintf(nn, "%s", newtablename);
nn += sprintf(nn, "-%s", dictname);
}
static inline void commit_txn(DB_TXN* txn, u_int32_t flags) {
int r;
r = txn->commit(txn, flags);
if (r != 0) {
sql_print_error("tried committing transaction 0x%x and got error code %d", txn, r);
}
nn += sprintf(nn, "%s%s", newtablename, ha_tokudb_ext);
if (dictname)
nn += sprintf(nn, "/%s%s", dictname, ha_tokudb_ext);
assert(r == 0);
}
static inline void abort_txn(DB_TXN* txn) {
int r;
r = txn->abort(txn);
if (r != 0) {
sql_print_error("tried aborting transaction 0x%x and got error code %d", txn, r);
}
assert(r == 0);
}
#endif
......@@ -31,7 +31,7 @@ extern "C" {
#undef HAVE_DTRACE
#undef _DTRACE_VERSION
#define TOKU_METADB_NAME ".\\tokudb_meta.tokudb"
#define TOKU_METADB_NAME "tokudb_meta"
static inline void *thd_data_get(THD *thd, int slot) {
return thd->ha_data[slot].ha_ptr;
......@@ -300,12 +300,17 @@ static int tokudb_init_func(void *p) {
r= metadata_db->open(metadata_db, 0, TOKU_METADB_NAME, NULL, DB_BTREE, DB_THREAD|DB_AUTO_COMMIT, 0);
if (r) {
sql_print_error("No metadata table exists, so creating it");
if (r != ENOENT) {
sql_print_error("Got error %d when trying to open metadata_db", r);
goto error;
}
sql_print_warning("No metadata table exists, so creating it");
r= metadata_db->open(metadata_db, NULL, TOKU_METADB_NAME, NULL, DB_BTREE, DB_THREAD | DB_CREATE, my_umask);
if (r) {
goto error;
}
metadata_db->close(metadata_db,0);
r = metadata_db->close(metadata_db,0);
assert(r == 0);
r = db_create(&metadata_db, db_env, 0);
if (r) {
DBUG_PRINT("info", ("failed to create metadata db %d\n", r));
......@@ -435,13 +440,14 @@ static int tokudb_commit(handlerton * hton, THD * thd, bool all) {
u_int32_t syncflag = THDVAR(thd, commit_sync) ? 0 : DB_TXN_NOSYNC;
tokudb_trx_data *trx = (tokudb_trx_data *) thd_data_get(thd, hton->slot);
DB_TXN **txn = all ? &trx->all : &trx->stmt;
int error = 0;
if (*txn) {
if (tokudb_debug & TOKUDB_DEBUG_TXN)
if (tokudb_debug & TOKUDB_DEBUG_TXN) {
TOKUDB_TRACE("commit:%d:%p\n", all, *txn);
error = (*txn)->commit(*txn, syncflag);
if (*txn == trx->sp_level)
}
commit_txn(*txn, syncflag);
if (*txn == trx->sp_level) {
trx->sp_level = 0;
}
*txn = 0;
}
else if (tokudb_debug & TOKUDB_DEBUG_TXN) {
......@@ -451,7 +457,7 @@ static int tokudb_commit(handlerton * hton, THD * thd, bool all) {
trx->iso_level = hatoku_iso_not_set;
}
reset_stmt_progress(&trx->stmt_progress);
TOKUDB_DBUG_RETURN(error);
TOKUDB_DBUG_RETURN(0);
}
static int tokudb_rollback(handlerton * hton, THD * thd, bool all) {
......@@ -459,14 +465,15 @@ static int tokudb_rollback(handlerton * hton, THD * thd, bool all) {
DBUG_PRINT("trans", ("aborting transaction %s", all ? "all" : "stmt"));
tokudb_trx_data *trx = (tokudb_trx_data *) thd_data_get(thd, hton->slot);
DB_TXN **txn = all ? &trx->all : &trx->stmt;
int error = 0;
if (*txn) {
if (tokudb_debug & TOKUDB_DEBUG_TXN)
if (tokudb_debug & TOKUDB_DEBUG_TXN) {
TOKUDB_TRACE("rollback:%p\n", *txn);
error = (*txn)->abort(*txn);
if (*txn == trx->sp_level)
trx->sp_level = 0;
*txn = 0;
}
abort_txn(*txn);
if (*txn == trx->sp_level) {
trx->sp_level = 0;
}
*txn = 0;
}
else {
if (tokudb_debug & TOKUDB_DEBUG_TXN) {
......@@ -477,7 +484,7 @@ static int tokudb_rollback(handlerton * hton, THD * thd, bool all) {
trx->iso_level = hatoku_iso_not_set;
}
reset_stmt_progress(&trx->stmt_progress);
TOKUDB_DBUG_RETURN(error);
TOKUDB_DBUG_RETURN(0);
}
#if 0
......@@ -570,8 +577,6 @@ static bool tokudb_show_data_size(THD * thd, stat_print_fn * stat_print, bool ex
if (!error) {
char* name = (char *)curr_key.data;
char* newname = NULL;
char name_buff[FN_REFLEN];
char* fn_ret = NULL;
u_int64_t curr_num_bytes = 0;
DB_BTREE_STAT64 dict_stats;
......@@ -585,12 +590,11 @@ static bool tokudb_show_data_size(THD * thd, stat_print_fn * stat_print, bool ex
}
make_name(newname, name, "main");
fn_ret = fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME|MY_SAFE_PATH);
error = db_create(&curr_db, db_env, 0);
if (error) { goto cleanup; }
error = curr_db->open(curr_db, 0, name_buff, NULL, DB_BTREE, DB_THREAD, 0);
error = curr_db->open(curr_db, 0, newname, NULL, DB_BTREE, DB_THREAD, 0);
if (error == ENOENT) { error = 0; continue; }
if (error) { goto cleanup; }
......@@ -678,7 +682,7 @@ cleanup:
tmp_table_cursor->c_close(tmp_table_cursor);
}
if (txn) {
txn->commit(txn, 0);
commit_txn(txn, 0);
}
if (error) {
sql_print_error("got an error %d in show_data_size\n", error);
......@@ -763,8 +767,8 @@ static bool tokudb_show_engine_status(THD * thd, stat_print_fn * stat_print) {
STATPRINT("logger lock", lockstat);
STATPRINT("logger lock counter", buf);
lockstat = (engstat.cachetable_lock_ctr & 0x01) ? "Locked" : "Unlocked";
lockctr = engstat.cachetable_lock_ctr >> 1; // lsb indicates if locked
//lockstat = (engstat.cachetable_lock_ctr & 0x01) ? "Locked" : "Unlocked";
//lockctr = engstat.cachetable_lock_ctr >> 1; // lsb indicates if locked
sprintf(buf, "%" PRIu32, lockctr);
STATPRINT("cachetable lock", lockstat);
STATPRINT("cachetable lock counter", buf);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment