Commit f4622e9d authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

refs #4743 rollback alter table add key with partitions

git-svn-id: file:///svn/mysql/tokudb-engine/tokudb-engine@46436 c7de825b-a66e-492c-adef-691d508d4ae1
parent e4530af1
......@@ -7765,7 +7765,7 @@ volatile int ha_tokudb_drop_indexes_wait = 0; // debug
// Internal function called by ha_tokudb::prepare_drop_index and ha_tokudb::alter_table_phase2
// With a transaction, drops dictionaries associated with indexes in key_num
//
int ha_tokudb::drop_indexes(TABLE *table_arg, uint *key_num, uint num_of_keys, DB_TXN* txn) {
int ha_tokudb::drop_indexes(TABLE *table_arg, uint *key_num, uint num_of_keys, KEY *key_info, DB_TXN* txn) {
TOKUDB_DBUG_ENTER("ha_tokudb::drop_indexes");
assert(txn);
......@@ -7787,10 +7787,10 @@ int ha_tokudb::drop_indexes(TABLE *table_arg, uint *key_num, uint num_of_keys, D
assert(r==0);
share->key_file[curr_index] = NULL;
error = remove_key_name_from_status(share->status_block, table_arg->key_info[curr_index].name, txn);
error = remove_key_name_from_status(share->status_block, key_info[curr_index].name, txn);
if (error) { goto cleanup; }
error = delete_or_rename_dictionary(share->table_name, NULL, table_arg->key_info[curr_index].name, true, txn, true);
error = delete_or_rename_dictionary(share->table_name, NULL, key_info[curr_index].name, true, txn, true);
if (error) { goto cleanup; }
}
......
......@@ -593,7 +593,7 @@ class ha_tokudb : public handler {
bool* modified_DB
);
void restore_add_index(TABLE* table_arg, uint num_of_keys, bool incremented_numDBs, bool modified_DBs);
int drop_indexes(TABLE *table_arg, uint *key_num, uint num_of_keys, DB_TXN* txn);
int drop_indexes(TABLE *table_arg, uint *key_num, uint num_of_keys, KEY *key_info, DB_TXN* txn);
void restore_drop_indexes(TABLE *table_arg, uint *key_num, uint num_of_keys);
public:
......
......@@ -66,7 +66,7 @@ ha_tokudb::prepare_drop_index(TABLE *table_arg, uint *key_num, uint num_of_keys)
error = db_env->txn_begin(db_env, 0, &txn, 0);
if (error) { goto cleanup; }
error = drop_indexes(table_arg, key_num, num_of_keys, txn);
error = drop_indexes(table_arg, key_num, num_of_keys, table_arg->key_info, txn);
if (error) { goto cleanup; }
cleanup:
......@@ -457,7 +457,7 @@ ha_tokudb::alter_table_phase2(
// drop indexes
if (dropping_indexes) {
error = drop_indexes(table, alter_info->index_drop_buffer, alter_info->index_drop_count, txn);
error = drop_indexes(table, alter_info->index_drop_buffer, alter_info->index_drop_count, table->key_info, txn);
if (error) { goto cleanup; }
}
......
......@@ -111,7 +111,7 @@ ha_tokudb::prepare_drop_index(TABLE *table_arg, uint *key_num, uint num_of_keys)
DB_TXN *txn = transaction;
assert(txn);
int error = drop_indexes(table_arg, key_num, num_of_keys, txn);
int error = drop_indexes(table_arg, key_num, num_of_keys, table_arg->key_info, txn);
DBUG_EXECUTE_IF("prepare_drop_index_fail", {
error = 1;
});
......
......@@ -92,11 +92,13 @@ is_disjoint_add_drop(Alter_inplace_info *ha_alter_info) {
class tokudb_alter_ctx : public inplace_alter_handler_ctx {
public:
tokudb_alter_ctx() {
alter_txn = NULL;
add_index_changed = false;
drop_index_changed = false;
compression_changed = false;
}
public:
DB_TXN *alter_txn;
bool add_index_changed;
bool incremented_num_DBs, modified_DBs;
bool drop_index_changed;
......@@ -123,9 +125,6 @@ ha_tokudb::check_if_supported_inplace_alter(TABLE *altered_table, Alter_inplace_
enum_alter_inplace_result result = HA_ALTER_INPLACE_NOT_SUPPORTED; // default is NOT inplace
HA_CREATE_INFO *create_info = ha_alter_info->create_info;
ha_alter_info->handler_ctx = new tokudb_alter_ctx;
assert(ha_alter_info->handler_ctx);
ulong handler_flags = fix_handler_flags(ha_alter_info, table, altered_table);
// always allow rename table + any other operation, so turn off the handler flag
......@@ -221,12 +220,19 @@ ha_tokudb::check_if_supported_inplace_alter(TABLE *altered_table, Alter_inplace_
DBUG_RETURN(result);
}
// prepare not yet called by the mysql 5.5 patch
bool
ha_tokudb::prepare_inplace_alter_table(TABLE *altered_table, Alter_inplace_info *ha_alter_info) {
TOKUDB_DBUG_ENTER("prepare_inplace_alter_table");
bool result = false; // success
assert(ha_alter_info->handler_ctx == NULL);
tokudb_alter_ctx *ctx = new tokudb_alter_ctx;
assert(ctx);
ha_alter_info->handler_ctx = ctx;
int r = db_env->txn_begin(db_env, 0, &ctx->alter_txn, 0);
if (r != 0)
result = true; // fail
DBUG_RETURN(result);
}
......@@ -235,6 +241,7 @@ ha_tokudb::inplace_alter_table(TABLE *altered_table, Alter_inplace_info *ha_alte
TOKUDB_DBUG_ENTER("inplace_alter_table");
int error = 0;
tokudb_alter_ctx *ctx = static_cast<tokudb_alter_ctx *>(ha_alter_info->handler_ctx);
HA_CREATE_INFO *create_info = ha_alter_info->create_info;
ulong handler_flags = fix_handler_flags(ha_alter_info, table, altered_table);
......@@ -249,24 +256,24 @@ ha_tokudb::inplace_alter_table(TABLE *altered_table, Alter_inplace_info *ha_alte
error = alter_table_add_or_drop_column(altered_table, ha_alter_info);
}
if (error == 0 && (handler_flags & Alter_inplace_info::CHANGE_CREATE_OPTION) && (create_info->used_fields & HA_CREATE_USED_AUTO)) {
error = write_auto_inc_create(share->status_block, create_info->auto_increment_value, transaction);
error = write_auto_inc_create(share->status_block, create_info->auto_increment_value, ctx->alter_txn);
}
if (error == 0 && (handler_flags & Alter_inplace_info::CHANGE_CREATE_OPTION) && (create_info->used_fields & HA_CREATE_USED_ROW_FORMAT)) {
enum toku_compression_method method = row_type_to_compression_method(create_info->row_type);
// Get the current type
tokudb_alter_ctx *ctx = static_cast<tokudb_alter_ctx *>(ha_alter_info->handler_ctx);
DB *db = share->key_file[0];
error = db->get_compression_method(db, &ctx->orig_compression_method);
assert(error == 0);
ctx->compression_changed = true;
// Set the new type.
enum toku_compression_method method = row_type_to_compression_method(create_info->row_type);
uint32_t curr_num_DBs = table->s->keys + test(hidden_primary_key);
for (uint32_t i = 0; i < curr_num_DBs; i++) {
db = share->key_file[i];
error = db->change_compression_method(db, method);
if (error)
break;
ctx->compression_changed = true;
}
}
......@@ -293,7 +300,7 @@ ha_tokudb::alter_table_add_index(TABLE *altered_table, Alter_inplace_info *ha_al
tokudb_alter_ctx *ctx = static_cast<tokudb_alter_ctx *>(ha_alter_info->handler_ctx);
ctx->add_index_changed = true;
int error = tokudb_add_index(table, key_info, ha_alter_info->index_add_count, transaction, &ctx->incremented_num_DBs, &ctx->modified_DBs);
int error = tokudb_add_index(table, key_info, ha_alter_info->index_add_count, ctx->alter_txn, &ctx->incremented_num_DBs, &ctx->modified_DBs);
if (error == HA_ERR_FOUND_DUPP_KEY) {
// hack for now, in case of duplicate key error,
// because at the moment we cannot display the right key
......@@ -307,24 +314,54 @@ ha_tokudb::alter_table_add_index(TABLE *altered_table, Alter_inplace_info *ha_al
return error;
}
static bool find_index_of_key(const char *key_name, TABLE *table, uint *index_offset_ptr) {
for (uint i = 0; i < table->s->keys; i++) {
if (strcmp(key_name, table->key_info[i].name) == 0) {
*index_offset_ptr = i;
return true;
}
}
return false;
}
static bool find_index_of_key(const char *key_name, KEY *key_info, uint key_count, uint *index_offset_ptr) {
for (uint i = 0; i < key_count; i++) {
if (strcmp(key_name, key_info[i].name) == 0) {
*index_offset_ptr = i;
return true;
}
}
return false;
}
int
ha_tokudb::alter_table_drop_index(TABLE *altered_table, Alter_inplace_info *ha_alter_info) {
// translate KEY pointers to indexes into the key_info array
KEY *key_info = table->key_info;
// translate key names to indexes into the key_info array
uint index_drop_offsets[ha_alter_info->index_drop_count];
for (uint i = 0; i < ha_alter_info->index_drop_count; i++)
index_drop_offsets[i] = ha_alter_info->index_drop_buffer[i] - table->key_info;
for (uint i = 0; i < ha_alter_info->index_drop_count; i++) {
bool found;
found = find_index_of_key(ha_alter_info->index_drop_buffer[i]->name, table, &index_drop_offsets[i]);
if (!found) {
// undo of add key in partition engine
found = find_index_of_key(ha_alter_info->index_drop_buffer[i]->name, ha_alter_info->key_info_buffer, ha_alter_info->key_count, &index_drop_offsets[i]);
assert(found);
key_info = ha_alter_info->key_info_buffer;
}
}
// drop indexes
tokudb_alter_ctx *ctx = static_cast<tokudb_alter_ctx *>(ha_alter_info->handler_ctx);
ctx->drop_index_changed = true;
int error = drop_indexes(table, index_drop_offsets, ha_alter_info->index_drop_count, transaction);
int error = drop_indexes(table, index_drop_offsets, ha_alter_info->index_drop_count, key_info, ctx->alter_txn);
return error;
}
int
ha_tokudb::alter_table_add_or_drop_column(TABLE *altered_table, Alter_inplace_info *ha_alter_info) {
tokudb_alter_ctx *ctx = static_cast<tokudb_alter_ctx *>(ha_alter_info->handler_ctx);
int error;
uchar *column_extra = NULL;
uchar *row_desc_buff = NULL;
......@@ -413,7 +450,7 @@ ha_tokudb::alter_table_add_or_drop_column(TABLE *altered_table, Alter_inplace_in
}
error = share->key_file[i]->change_descriptor(
share->key_file[i],
transaction,
ctx->alter_txn,
&row_descriptor,
0
);
......@@ -437,7 +474,7 @@ ha_tokudb::alter_table_add_or_drop_column(TABLE *altered_table, Alter_inplace_in
DBUG_ASSERT(num_column_extra <= max_column_extra_size);
error = share->key_file[i]->update_broadcast(
share->key_file[i],
transaction,
ctx->alter_txn,
&column_dbt,
DB_IS_RESETTING_OP
);
......@@ -457,54 +494,47 @@ bool
ha_tokudb::commit_inplace_alter_table(TABLE *altered_table, Alter_inplace_info *ha_alter_info, bool commit) {
TOKUDB_DBUG_ENTER("commit_inplace_alter_table");
tokudb_alter_ctx *ctx = static_cast<tokudb_alter_ctx *>(ha_alter_info->handler_ctx);
bool result = false; // success
if (commit) {
if (altered_table->part_info == NULL) {
// read frmdata for the altered table
// read frm data for the altered table
uchar *frm_data; size_t frm_len;
int error = readfrm(altered_table->s->path.str, &frm_data, &frm_len);
if (error) {
result = true;
} else {
// transactionally write frmdata to status
assert(transaction);
error = write_to_status(share->status_block, hatoku_frm_data, (void *)frm_data, (uint)frm_len, transaction);
if (error) {
result = true;
}
if (!error) {
// transactionally write frm data to status
error = write_to_status(share->status_block, hatoku_frm_data, (void *)frm_data, (uint)frm_len, ctx->alter_txn);
my_free(frm_data);
}
if (error)
if (error) {
commit = false;
result = true;
print_error(error, MYF(0));
}
}
}
if (!commit || result == true) {
// abort the transaction NOW so that any alters are rolled back. this allows the following restores to work.
THD *thd = ha_thd();
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);
assert(trx && transaction == trx->stmt && transaction == trx->sub_sp_level);
abort_txn(transaction);
transaction = NULL;
trx->stmt = NULL;
trx->sub_sp_level = NULL;
trx->should_abort = false;
tokudb_alter_ctx *ctx = static_cast<tokudb_alter_ctx *>(ha_alter_info->handler_ctx);
if (commit) {
// commit the alter transaction NOW
commit_txn(ctx->alter_txn, 0);
ctx->alter_txn = NULL;
} else {
// abort the alter transaction NOW so that any alters are rolled back. this allows the following restores to work.
abort_txn(ctx->alter_txn);
ctx->alter_txn = NULL;
if (ctx->add_index_changed) {
restore_add_index(table, ha_alter_info->index_add_count, ctx->incremented_num_DBs, ctx->modified_DBs);
}
if (ctx->drop_index_changed) {
// translate KEY pointers to indexes into the key_info array
// translate key names to indexes into the key_info array
uint index_drop_offsets[ha_alter_info->index_drop_count];
for (uint i = 0; i < ha_alter_info->index_drop_count; i++)
index_drop_offsets[i] = ha_alter_info->index_drop_buffer[i] - table->key_info;
for (uint i = 0; i < ha_alter_info->index_drop_count; i++) {
bool found = find_index_of_key(ha_alter_info->index_drop_buffer[i]->name, table, &index_drop_offsets[i]);
assert(found);
}
restore_drop_indexes(table, index_drop_offsets, ha_alter_info->index_drop_count);
}
if (ctx->compression_changed) {
......
......@@ -210,8 +210,9 @@ static inline void make_name(char *newname, const char *tablename, const char *d
}
static inline void commit_txn(DB_TXN* txn, uint32_t flags) {
int r;
r = txn->commit(txn, flags);
if (tokudb_debug & TOKUDB_DEBUG_TXN)
TOKUDB_TRACE("commit_txn %p\n", txn);
int r = txn->commit(txn, flags);
if (r != 0) {
sql_print_error("tried committing transaction %p and got error code %d", txn, r);
}
......@@ -219,8 +220,9 @@ static inline void commit_txn(DB_TXN* txn, uint32_t flags) {
}
static inline void abort_txn(DB_TXN* txn) {
int r;
r = txn->abort(txn);
if (tokudb_debug & TOKUDB_DEBUG_TXN)
TOKUDB_TRACE("abort_txn %p\n", txn);
int r = txn->abort(txn);
if (r != 0) {
sql_print_error("tried aborting transaction %p and got error code %d", txn, r);
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment